From 50750146b1ccc1d87793522780ca21b4e376b2e1 Mon Sep 17 00:00:00 2001 From: Chris Sanchez Date: Fri, 26 Jun 2020 23:55:16 +0000 Subject: [PATCH] 'Version 1.5.2 of the DynamoDB Streams Kinesis Adapter' --- README.md | 6 ++++- pom.xml | 4 +-- .../streamsadapter/StreamsWorkerFactory.java | 14 +++++----- .../functionals/KinesisParametersTest.java | 27 +++++++++++++++++++ 4 files changed, 41 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 9cb7718..6278294 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,11 @@ * The KCL is designed to process streams from Amazon Kinesis, but by adding the DynamoDB Streams Kinesis Adapter, your application can process DynamoDB Streams instead, seamlessly and efficiently. ## Release Notes -### Latest Release (v1.5.1) +### Latest Release (v1.5.2) +* Upgrades jackson-databind to version 2.9.10.5 +* Updates `StreamsWorkerFactory` to use `KinesisClientLibConfiguration` billing mode when constructing `KinesisClientLeaseManager`. + +### Release (v1.5.1) * Restores compile compatibility with KCL 1.13.3. * Fixes a performance issue that arised when using v1.5.0 with KCL 1.12 through 1.13.2. * Fixes a defect where `MaxLeasesForWorker` configuration was not being propagated to `StreamsLeaseTaker`. diff --git a/pom.xml b/pom.xml index 4a91349..06e1412 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ dynamodb-streams-kinesis-adapter jar DynamoDB Streams Adapter for Java - 1.5.1 + 1.5.2 The DynamoDB Streams Adapter implements the AmazonKinesis interface so that your application can use KCL to consume and process data from a DynamoDB stream. https://aws.amazon.com/dynamodb @@ -62,7 +62,7 @@ com.fasterxml.jackson.core jackson-databind - 2.9.10.3 + 2.9.10.5 diff --git a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/StreamsWorkerFactory.java b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/StreamsWorkerFactory.java index 87b499a..dee4884 100644 --- a/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/StreamsWorkerFactory.java +++ b/src/main/java/com/amazonaws/services/dynamodbv2/streamsadapter/StreamsWorkerFactory.java @@ -50,7 +50,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP config.getDynamoDBClientConfiguration(), config.getDynamoDBEndpoint(), config.getRegionName()); - KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient); + KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()); return new Worker .Builder() .recordProcessorFactory(recordProcessorFactory) @@ -77,7 +77,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP */ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonDynamoDBStreamsAdapterClient streamsClient, AmazonDynamoDB dynamoDBClient, AmazonCloudWatch cloudWatchClient) { - KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient); + KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()); return new Worker .Builder() .recordProcessorFactory(recordProcessorFactory) @@ -107,7 +107,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP */ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonDynamoDBStreamsAdapterClient streamsClient, AmazonDynamoDB dynamoDBClient, AmazonCloudWatch cloudWatchClient, ExecutorService execService) { - KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient); + KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()); return new Worker .Builder() .recordProcessorFactory(recordProcessorFactory) @@ -138,7 +138,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP */ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonDynamoDBStreamsAdapterClient streamsClient, AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { - KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient); + KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()); return new Worker .Builder() .recordProcessorFactory(recordProcessorFactory) @@ -167,7 +167,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP */ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonDynamoDBStreamsAdapterClient streamsClient, AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) { - KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient); + KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()); return new Worker .Builder() .recordProcessorFactory(recordProcessorFactory) @@ -197,7 +197,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP */ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonDynamoDBStreamsAdapterClient streamsClient, AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) { - KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient); + KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()); return new Worker .Builder() .recordProcessorFactory(recordProcessorFactory) @@ -228,7 +228,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP */ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config, AmazonDynamoDBStreamsAdapterClient streamsClient, AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) { - KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient); + KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode()); return new Worker .Builder() .recordProcessorFactory(recordProcessorFactory) diff --git a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/functionals/KinesisParametersTest.java b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/functionals/KinesisParametersTest.java index f96c38a..406c628 100644 --- a/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/functionals/KinesisParametersTest.java +++ b/src/test/java/com/amazonaws/services/dynamodbv2/streamsadapter/functionals/KinesisParametersTest.java @@ -8,6 +8,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import com.amazonaws.services.dynamodbv2.model.BillingMode; +import com.amazonaws.services.dynamodbv2.model.BillingModeSummary; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.junit.Test; @@ -79,4 +81,29 @@ public void numProcessRecordsCallsTest() throws Exception { // Atleast 1 and atmost 2 getRecords/processRecords calls should have been made assertTrue(numGetRecordsCalls > 0 && numGetRecordsCalls <= 3); } + + /** + * This test configures the worker with a non-default billing mode and ensures that the billing mode is passed + * through to the created lease table. + */ + @Test + public void billingModeTest() throws Exception { + KinesisClientLibConfiguration workerConfig = + new KinesisClientLibConfiguration(leaseTable, streamId, credentials, KCL_WORKER_ID) + .withBillingMode(BillingMode.PAY_PER_REQUEST); + + startKCLWorker(workerConfig); + + while (recordProcessorFactory.getNumRecordsProcessed() < 0) { + LOG.info("Sleep till RecordProcessor is initialized"); + Thread.sleep(THREAD_SLEEP_2S); + } + + shutDownKCLWorker(); + + DescribeTableResult describeTableResult = TestUtil.describeTable(dynamoDBClient, leaseTable); + TableDescription leaseTableDescription = describeTableResult.getTable(); + BillingModeSummary billingModeSummary = leaseTableDescription.getBillingModeSummary(); + assertEquals(BillingMode.PAY_PER_REQUEST.toString(), billingModeSummary.getBillingMode()); + } } \ No newline at end of file