From e731cd699bb70bb41fefb82284769f3244784d04 Mon Sep 17 00:00:00 2001 From: markglh Date: Thu, 28 Dec 2017 20:51:29 +0000 Subject: [PATCH] Update KPL and KCL libraries (#44) * upgraded KCL * upgrade KPL * updated readme --- README.md | 5 + build.sbt | 4 +- src/main/resources/reference.conf | 64 ++++ .../kinesis/producer/ProducerConf.scala | 11 - .../kinesis/consumer/ConsumerConfSpec.scala | 324 ++++++++++++++++++ .../consumer/KinesisConsumerSpec.scala | 192 +---------- .../kinesis/producer/ProducerConfSpec.scala | 9 +- .../stream/KinesisSourceGraphSpec.scala | 3 + 8 files changed, 415 insertions(+), 197 deletions(-) create mode 100644 src/test/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerConfSpec.scala diff --git a/README.md b/README.md index 5ddb277..21b1cd2 100644 --- a/README.md +++ b/README.md @@ -35,6 +35,7 @@ It's worth familiarising yourself with [Sequence numbers and Sub sequence number * [Valid Release Tag Examples:](#contributor-guide-tag-requirements-valid-release-tag-examples) * [Invalid Release Tag Examples:](#contributor-guide-tag-requirements-invalid-release-tag-examples) * [Contribution policy](#contribution-policy) +* [Changelog](#changelog) * [License](#license) @@ -596,6 +597,10 @@ explicitly, by submitting any copyrighted material via pull request, email, or o agree to license the material under the project's open source license and warrant that you have the legal authority to do so. + +# Changelog +See the releases tab: https://github.com/WW-Digital/reactive-kinesis/releases + # License diff --git a/build.sbt b/build.sbt index b3efe11..bc1c020 100644 --- a/build.sbt +++ b/build.sbt @@ -40,11 +40,11 @@ lazy val library = "com.fasterxml.uuid" % "java-uuid-generator" % "3.1.4" % Compile) val amazon = Seq( - "com.amazonaws" % "amazon-kinesis-client" % "1.8.1" % Compile + "com.amazonaws" % "amazon-kinesis-client" % "1.8.8" % Compile excludeAll( ExclusionRule(organization = "com.fasterxml.jackson.core"), ExclusionRule(organization = "com.fasterxml.jackson.dataformat")), - "com.amazonaws" % "amazon-kinesis-producer" % "0.12.5" % Compile + "com.amazonaws" % "amazon-kinesis-producer" % "0.12.8" % Compile excludeAll( ExclusionRule(organization = "com.fasterxml.jackson.core"), ExclusionRule(organization = "com.fasterxml.jackson.dataformat")) diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index 393f0f5..b5b9737 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -652,6 +652,70 @@ kinesis { # Default: no timeout #timeoutInSeconds = + # The amount of milliseconds to wait before graceful shutdown forcefully terminates the process. + # Default: 5000 + #shutdownGraceMillis = 5000 + + # Internally support timeouts and retries for GetRecords calls. + # + # If retryGetRecordsInSeconds is set + # And maxGetRecordsThreadPool is set + # Then getRecords will asynchronously retry internally using a CompletionService of + # max size "maxGetRecordsThreadPool" with "retryGetRecordsInSeconds" between each retry. + # + # NOTE: this enables the AsynchronousGetRecordsRetrievalStrategy for getRecords + # + # Time in seconds to wait before the worker retries to get a record + # Default: Optional value, default not set + #retryGetRecordsInSeconds = 1 + # + # max number of threads in the getRecords thread pool (per shard) + # Default: Optional value, default not set + #maxGetRecordsThreadPool = 2 + + + ####################### + # Pre-fetching config # + ####################### + # Pre-fetching will retrieve and queue additional records from Kinesis while the + # application is processing existing records. + # Pre-fetching can be enabled by setting dataFetchingStrategy to PREFETCH_CACHED. Once + # enabled an additional fetching thread will be started to retrieve records from Kinesis. + # Retrieved records will be held in a queue until the application is ready to process them. + + # Which data fetching strategy to use (DEFAULT, PREFETCH_CACHED) + # Default: DEFAULT + dataFetchingStrategy = PREFETCH_CACHED + + # + # Pre-fetching supports the following configuration values: + # + + # The maximum number of process records input that can be queued + # Default: 3 + #maxPendingProcessRecordsInput = 3 + + # The maximum number of bytes that can be queued + # Default 8388608 (8 * 1024 * 1024 / 8Mb) + #maxCacheByteSize = 8388608 + + # The maximum number of records that can be queued + # Default: 30000 + #maxRecordsCount = 30000 + + # The amount of time to wait between calls to Kinesis + # Default: 1500 + #idleMillisBetweenCalls = 1500 + + ############################## + # End of Pre-fetching config # + ############################## + + + # Milliseconds after which the logger will log a warning message for the long running task + # Default: not set + #logWarningForTaskAfterMillis = 100 + } } } diff --git a/src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala b/src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala index 19a02f5..50ff3cd 100644 --- a/src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala +++ b/src/main/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConf.scala @@ -96,17 +96,6 @@ object ProducerConf { KinesisProducerConfiguration.fromProperties(kplProps) credentialsProvider.foreach(kplLibConfiguration.setCredentialsProvider) - //TODO, this should be part of the KPL. The KCL would handle enums and ints and let us use props directly. - //TODO can be removed once this is merged: https://github.com/awslabs/amazon-kinesis-producer/pull/134 - if (kplConfig.hasPath("ThreadingModel")) { - kplLibConfiguration.setThreadingModel( - ThreadingModel.valueOf(kplConfig.getString("ThreadingModel")) - ) - } - if (kplConfig.hasPath("ThreadPoolSize")) { - kplLibConfiguration.setThreadPoolSize(kplConfig.getInt("ThreadPoolSize")) - } - kplLibConfiguration } diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerConfSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerConfSpec.scala new file mode 100644 index 0000000..c220ec8 --- /dev/null +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/ConsumerConfSpec.scala @@ -0,0 +1,324 @@ +/* + * Copyright 2017 WeightWatchers + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.weightwatchers.reactive.kinesis.consumer + +import java.io.File + +import akka.actor.ActorSystem +import akka.testkit.{ImplicitSender, TestKit} +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf +import org.scalatest._ +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.mockito.MockitoSugar +import org.scalatest.time.{Millis, Seconds, Span} + +import scala.concurrent.duration.DurationInt + +//scalastyle:off magic.number +class ConsumerConfSpec + extends TestKit(ActorSystem("consumer-conf-spec")) + with ImplicitSender + with FreeSpecLike + with Matchers + with MockitoSugar + with BeforeAndAfterAll + with GivenWhenThen + with ScalaFutures { + + val defaultKinesisConfig = + ConfigFactory.parseFile(new File("src/main/resources/reference.conf")).getConfig("kinesis") + + implicit val defaultPatience = + PatienceConfig(timeout = Span(3, Seconds), interval = Span(50, Millis)) + + val kinesisConfig = ConfigFactory + .parseString( + """ + |kinesis { + | + | application-name = "TestSpec" + | + | test-consumer-config { + | stream-name = "some-other-stream" + | + | worker { + | batchTimeoutSeconds = 1234 + | gracefulShutdownHook = false + | shutdownTimeoutSeconds = 2 + | } + | + | checkpointer { + | backoffMillis = 4321 + | } + | + | kcl { + | AWSCredentialsProvider = DefaultAWSCredentialsProviderChain + | + | regionName = us-east-2 + | + | # Default: LATEST + | initialPositionInStream = TRIM_HORIZON + | + | # Default = 10000 + | maxRecords = 20000 + | + | # Default = 1000 + | idleTimeBetweenReadsInMillis = 1234 + | + | # Default: 10000 + | failoverTimeMillis = 11000 + | + | # Default: 60000 + | shardSyncIntervalMillis = 70000 + | + | # Default: true + | cleanupLeasesUponShardCompletion = false + | + | # Default: true + | validateSequenceNumberBeforeCheckpointing = false + | + | # Default: null + | kinesisEndpoint = "https://kinesis" + | + | # Default: null + | dynamoDBEndpoint = "https://dynamo" + | + | # Default: false + | callProcessRecordsEvenForEmptyRecordList = true + | + | # Default: 10000 + | parentShardPollIntervalMillis = 40000 + | + | # Default: 500 + | taskBackoffTimeMillis = 600 + | + | # Default: 10000 + | metricsBufferTimeMillis = 10001 + | + | + | # Default: 10000 + | metricsMaxQueueSize = 10009 + | + | + | # Default: DETAILED + | metricsLevel = NONE + | + | + | # Default: Operation, ShardId + | metricsEnabledDimensions = Operation + | + | + | # Default: 2147483647 (Integer.MAX_VALUE) + | maxLeasesForWorker = 11111111 + | + | + | # Default: 1 + | maxLeasesToStealAtOneTime = 2 + | + | + | # Default: 10 + | initialLeaseTableReadCapacity = 15 + | + | + | # Default: 10 + | initialLeaseTableWriteCapacity = 14 + | + | # Default: false + | skipShardSyncAtStartupIfLeasesExist=true + | + | + | # Default: + | userAgent = testy123 + | + | # Default = + | tableName = meh + | + | # Default: 20 + | maxLeaseRenewalThreads=9 + | + | + | # Default: no timeout + | timeoutInSeconds = 10 + | + | + | # The amount of milliseconds to wait before graceful shutdown forcefully terminates. + | # Default: 5000 + | shutdownGraceMillis = 2500 + | + | # If retryGetRecordsInSeconds is set + | # And maxGetRecordsThreadPool is set + | # Then use getRecords will asynchronously retry internally using a CompletionService of + | # max size maxGetRecordsThreadPool with "retryGetRecordsInSeconds" between each retry + | # + | # Time in seconds to wait before the worker retries to get a record + | # Default: Optional value, default not set + | retryGetRecordsInSeconds = 2 + | + | # max number of threads in the getRecords thread pool + | # Default: Optional value, default not set + | maxGetRecordsThreadPool = 1 + | + | # + | # Pre-fetching config + | # + | + | # Pre-fetching will retrieve and queue additional records from Kinesis while the + | # application is processing existing records. + | # Pre-fetching can be enabled by setting dataFetchingStrategy to PREFETCH_CACHED. Once + | # enabled an additional fetching thread will be started to retrieve records from Kinesis. + | # Retrieved records will be held in a queue until the application is ready to process them. + | + | # Which data fetching strategy to use (DEFAULT, PREFETCH_CACHED) + | # Default: DEFAULT + | dataFetchingStrategy = DEFAULT + | + | # + | # Pre-fetching supports the following configuration values: + | # + | + | # The maximum number of process records input that can be queued + | # Default: 3 + | maxPendingProcessRecordsInput = 3 + | + | # The maximum number of bytes that can be queued + | # Default 8388608 (8 * 1024 * 1024 / 8Mb) + | maxCacheByteSize = 8388608 + | + | # The maximum number of records that can be queued + | # Default: 30000 + | maxRecordsCount = 30000 + | + | # The amount of time to wait between calls to Kinesis + | # Default: 1500 + | idleMillisBetweenCalls = 1500 + | + | # + | # End of Pre-fetching config + | # + | + | # Milliseconds after which the logger will log a warning message for the long running task + | # Default: not set + | logWarningForTaskAfterMillis = 100 + | } + | + | } + |} + """.stripMargin + ) + .getConfig("kinesis") + .withFallback(defaultKinesisConfig) + + "The Consumerconf" - { + + "Should parse test-consumer-config into a ConsumerConf, setting all properties in the KinesisClientLibConfiguration" in { + // This will fail when fields are added or renamed in the KCL + // + // The properties are automatically tested reflectively against the underlying implementation + // First we load each property reflectively + // Then we parse and try to load it from our config + // If it isn't present we fail the test + + // We reflectively load fields, but setters are used in the implementation. + // However some setters don't match the field names (?!?), this renames them on the fly. + val confToFieldConversions = Map( + "skipShardSyncAtStartupIfLeasesExist" -> "skipShardSyncAtWorkerInitializationIfLeasesExist", + "maxCacheByteSize" -> "maxByteSize" + ) + + // RecordsFetcherFactory is a nested object in the configurator, so we define it's fields manually as a hack + val recordFetcherFields = List( + "maxPendingProcessRecordsInput", + "maxCacheByteSize", + "maxRecordsCount", + "idleMillisBetweenCalls", + "dataFetchingStrategy" + ) + + // Some fields dion't have setters / aren't configurable + val fieldsToSkip = List( + "useragent", //this gets nested internally + "streamname", + "timestampatinitialpositioninstream", + "commonclientconfig", + "shardprioritizationstrategy", + "kinesisclientconfig", + "dynamodbclientconfig", + "cloudwatchclientconfig", + "credentialsprovider", //these must be tested individually + "applicationname" + ) + + val consumerConf = ConsumerConf(kinesisConfig, "test-consumer-config") + + consumerConf.workerConf.batchTimeout should be(1234.seconds) + consumerConf.workerConf.failedMessageRetries should be(1) + consumerConf.workerConf.failureTolerancePercentage should be(0.25) + consumerConf.workerConf.shutdownHook should be(false) + consumerConf.workerConf.shutdownTimeout should be(Timeout(2.seconds)) + consumerConf.checkpointerConf.backoff should be(4321.millis) + consumerConf.checkpointerConf.interval should be(2000.millis) //reference default + consumerConf.checkpointerConf.notificationDelay should be(1000.millis) //reference default + consumerConf.dispatcher should be(Some("kinesis.akka.default-dispatcher")) + consumerConf.kclConfiguration.getApplicationName should be( + "TestSpec-some-other-stream" + ) + + val kclConfig = kinesisConfig.getConfig("test-consumer-config.kcl") + val kclLibConfiguration = consumerConf.kclConfiguration + val recordsFetcherFactory = kclLibConfiguration.getRecordsFetcherFactory + + //We're dealing with Java classes so using Java reflection is cleaner here + //Start with the setters to prevent picking up all the unrelated private fields, stripping the "with" + val configKeys = kclLibConfiguration.getClass.getDeclaredMethods + .filter(_.getName.startsWith("with")) + .map(_.getName.drop(4)) + .map(field => field.head.toLower + field.tail) + .filterNot( + field => fieldsToSkip.contains(field.toLowerCase) + ) + + configKeys foreach { configKey => + //Hack to deal with the nested objects + //The "with" setters live on the kclLibConfiguration, but they defer and set the value on the nested object + //We need to know which object to assert, so we use a pre-defined list + //TODO we could do this automatically if we used getFields to load all fields and traversed down, combining the lists + val obj = + if (recordFetcherFields.contains(configKey)) recordsFetcherFactory + else kclLibConfiguration + + val field = + obj.getClass.getDeclaredField( + confToFieldConversions.getOrElse(configKey, configKey) + ) + field.setAccessible(true) + + withClue( + s"Property `$configKey` was missing or incorrect when asserting the KCL configuration - possibly a newly added KCL property: " + //The property should be defined in the test config above, and also in the reference.conf with full description and defaults - commented out. + ) { + kclConfig.hasPath(configKey) should be(true) + field.get(obj).toString should include(kclConfig.getString(configKey)) + } + } + + } + } +} + +//scalastyle:on diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/KinesisConsumerSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/KinesisConsumerSpec.scala index b5e918e..a72008d 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/KinesisConsumerSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/consumer/KinesisConsumerSpec.scala @@ -58,7 +58,8 @@ class KinesisConsumerSpec PatienceConfig(timeout = Span(3, Seconds), interval = Span(50, Millis)) val kinesisConfig = ConfigFactory - .parseString(""" + .parseString( + """ |kinesis { | | application-name = "TestSpec" @@ -104,112 +105,9 @@ class KinesisConsumerSpec | } | } | - | testConsumer-3 { - | stream-name = "some-other-stream" - | - | worker { - | batchTimeoutSeconds = 1234 - | gracefulShutdownHook = false - | shutdownTimeoutSeconds = 2 - | } - | - | checkpointer { - | backoffMillis = 4321 - | } - | - | kcl { - | AWSCredentialsProvider = DefaultAWSCredentialsProviderChain - | - | regionName = us-east-2 - | - | # Default: LATEST - | initialPositionInStream = TRIM_HORIZON - | - | # Default = 10000 - | maxRecords = 20000 - | - | # Default = 1000 - | idleTimeBetweenReadsInMillis = 1234 - | - | # Default: 10000 - | failoverTimeMillis = 11000 - | - | # Default: 60000 - | shardSyncIntervalMillis = 70000 - | - | # Default: true - | cleanupLeasesUponShardCompletion = false - | - | # Default: true - | validateSequenceNumberBeforeCheckpointing = false - | - | # Default: null - | kinesisEndpoint = "https://kinesis" - | - | # Default: null - | dynamoDBEndpoint = "https://dynamo" - | - | # Default: false - | callProcessRecordsEvenForEmptyRecordList = true - | - | # Default: 10000 - | parentShardPollIntervalMillis = 40000 - | - | # Default: 500 - | taskBackoffTimeMillis = 600 - | - | # Default: 10000 - | metricsBufferTimeMillis = 10001 - | - | - | # Default: 10000 - | metricsMaxQueueSize = 10009 - | - | - | # Default: DETAILED - | metricsLevel = NONE - | - | - | # Default: Operation, ShardId - | metricsEnabledDimensions = Operation - | - | - | # Default: 2147483647 (Integer.MAX_VALUE) - | maxLeasesForWorker = 11111111 - | - | - | # Default: 1 - | maxLeasesToStealAtOneTime = 2 - | - | - | # Default: 10 - | initialLeaseTableReadCapacity = 15 - | - | - | # Default: 10 - | initialLeaseTableWriteCapacity = 14 - | - | # Default: false - | skipShardSyncAtStartupIfLeasesExist=true - | - | - | # Default: - | userAgent = testy123 - | - | # Default = - | tableName = meh - | - | # Default: 20 - | maxLeaseRenewalThreads=9 - | - | - | # Default: no timeout - | timeoutInSeconds = 10 - | } - | - | } |} - """.stripMargin) + """.stripMargin + ) .getConfig("kinesis") .withFallback(defaultKinesisConfig) @@ -225,7 +123,7 @@ class KinesisConsumerSpec .createProcessor() shouldBe a[ConsumerProcessingManager] } - def assertConsumer1Config(): Assertion = { + "Should parse the Config into a ConsumerConf for a single consumer" in { val consumerConf = ConsumerConf(kinesisConfig, "testConsumer-1") consumerConf.workerConf.batchTimeout should be(1234.seconds) @@ -241,12 +139,12 @@ class KinesisConsumerSpec "TestSpec-test-kinesis-reliability" ) consumerConf.kclConfiguration.getStreamName should be("test-kinesis-reliability") - consumerConf.kclConfiguration.getKinesisEndpoint should be("CustomKinesisEndpoint") //validate an override property - consumerConf.kclConfiguration.getDynamoDBEndpoint should be("CustomDynamoDBEndpoint") //validate an override property + consumerConf.kclConfiguration.getKinesisEndpoint should be("CustomKinesisEndpoint") + consumerConf.kclConfiguration.getDynamoDBEndpoint should be("CustomDynamoDBEndpoint") consumerConf.kclConfiguration.getSkipShardSyncAtWorkerInitializationIfLeasesExist should be( true - ) //validate an override property - consumerConf.kclConfiguration.getTableName should be("TableName") //validate an override property + ) + consumerConf.kclConfiguration.getTableName should be("TableName") val credentialsProvider = consumerConf.kclConfiguration.getKinesisCredentialsProvider .asInstanceOf[AWSCredentialsProviderChain] @@ -261,79 +159,7 @@ class KinesisConsumerSpec consumerConf.kclConfiguration.getRegionName should be("us-east-1") } - "Should parse the Config into a ConsumerConf for a single consumer" in { - assertConsumer1Config() - } - - "Should parse consumer 3 the Config into a ConsumerConf, setting all properties in the KinesisClientLibConfiguration" in { - //This will fail when fields are added or renamed in the KCL - - // Some setters don't match the field names. - val confToFieldConversions = Map( - "skipShardSyncAtStartupIfLeasesExist" -> "skipShardSyncAtWorkerInitializationIfLeasesExist" - ) - - val fieldsToSkip = List( - "useragent", //this gets nested internally - "streamname", - "timestampatinitialpositioninstream", - "commonclientconfig", - "shardprioritizationstrategy", - "kinesisclientconfig", - "dynamodbclientconfig", - "cloudwatchclientconfig", - "credentialsprovider", //these must be tested individually - "applicationname" - ) - - val consumerConf = ConsumerConf(kinesisConfig, "testConsumer-3") - - consumerConf.workerConf.batchTimeout should be(1234.seconds) - consumerConf.workerConf.failedMessageRetries should be(1) - consumerConf.workerConf.failureTolerancePercentage should be(0.25) - consumerConf.workerConf.shutdownHook should be(false) - consumerConf.workerConf.shutdownTimeout should be(Timeout(2.seconds)) - consumerConf.checkpointerConf.backoff should be(4321.millis) - consumerConf.checkpointerConf.interval should be(2000.millis) //reference default - consumerConf.checkpointerConf.notificationDelay should be(1000.millis) //reference default - consumerConf.dispatcher should be(Some("kinesis.akka.default-dispatcher")) - consumerConf.kclConfiguration.getApplicationName should be( - "TestSpec-some-other-stream" - ) - - val kclConfig = kinesisConfig.getConfig("testConsumer-3.kcl") - val kclLibConfiguration = consumerConf.kclConfiguration - - //We're dealing with Java classes so using Java reflection is cleaner here - //Start with the setters to prevent picking up all the unrelated private fields, stripping the "with" - val configKeys = kclLibConfiguration.getClass.getDeclaredMethods - .filter(_.getName.startsWith("with")) - .map(_.getName.drop(4)) - .map(field => field.head.toLower + field.tail) - .filterNot( - field => fieldsToSkip.contains(field.toLowerCase) - ) - - configKeys foreach { configKey => - val field = - kclLibConfiguration.getClass.getDeclaredField( - confToFieldConversions.getOrElse(configKey, configKey) - ) - field.setAccessible(true) - - withClue( - s"Property `$configKey` was not as expected when asserting the KCL configuration: " - ) { - kclConfig.hasPath(configKey) should be(true) - field.get(kclLibConfiguration).toString should include(kclConfig.getString(configKey)) - } - } - - } - "Should parse the Config into multiple ConsumerConf objects for multiple consumers" in { - assertConsumer1Config() - val consumerConf2 = ConsumerConf(kinesisConfig, "testConsumer-2") consumerConf2.workerConf.batchTimeout should be(10.seconds) diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConfSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConfSpec.scala index 0407fd0..76b28cb 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConfSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/producer/ProducerConfSpec.scala @@ -211,6 +211,13 @@ class ProducerConfSpec "The ProducerConf" - { "Should parse the Config into a ProducerConf, setting all properties in the KinesisProducerConfiguration" in { + // This will fail when fields are added or renamed in the KPL + // + // The properties are automatically tested reflectively against the underlying implementation + // First we load each property reflectively + // Then we parse and try to load it from our config + // If it isn't present we fail the test + val producerConf = ProducerConf(kinesisConfig, "testProducer") producerConf.streamName should be("core-test-kinesis-producer") @@ -238,7 +245,7 @@ class ProducerConfSpec field.setAccessible(true) withClue( - s"Property `$configKey` was not as expected when asserting the KPL configuration: " + s"Property `$configKey` was missing or incorrect when asserting the KPL configuration - possibly a newly added KPL property: " ) { kplConfig.hasPath(configKey) should be(true) field.get(kplLibConfiguration).toString should be(kplConfig.getString(configKey)) diff --git a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphSpec.scala b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphSpec.scala index 9a0c347..00eb9da 100644 --- a/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphSpec.scala +++ b/src/test/scala/com/weightwatchers/reactive/kinesis/stream/KinesisSourceGraphSpec.scala @@ -36,6 +36,7 @@ import com.weightwatchers.reactive.kinesis.consumer.KinesisConsumer.ConsumerConf import com.weightwatchers.reactive.kinesis.models.{CompoundSequenceNumber, ConsumerEvent} import org.joda.time.DateTime import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time.{Millis, Seconds, Span} import org.scalatest.{BeforeAndAfterAll, FreeSpecLike, Matchers} import scala.concurrent.duration._ @@ -51,6 +52,8 @@ class KinesisSourceGraphSpec implicit val materializer: Materializer = ActorMaterializer() implicit val ec = system.dispatcher + implicit val defaultPatience = + PatienceConfig(timeout = Span(3, Seconds), interval = Span(50, Millis)) "KinesisSourceGraph" - {