Skip to content

Commit

Permalink
'Version 1.5.2 of the DynamoDB Streams Kinesis Adapter'
Browse files Browse the repository at this point in the history
  • Loading branch information
csan6 committed Jun 26, 2020
1 parent bdd2605 commit 5075014
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 10 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
* The KCL is designed to process streams from Amazon Kinesis, but by adding the DynamoDB Streams Kinesis Adapter, your application can process DynamoDB Streams instead, seamlessly and efficiently.

## Release Notes
### Latest Release (v1.5.1)
### Latest Release (v1.5.2)
* Upgrades jackson-databind to version 2.9.10.5
* Updates `StreamsWorkerFactory` to use `KinesisClientLibConfiguration` billing mode when constructing `KinesisClientLeaseManager`.

### Release (v1.5.1)
* Restores compile compatibility with KCL 1.13.3.
* Fixes a performance issue that arised when using v1.5.0 with KCL 1.12 through 1.13.2.
* Fixes a defect where `MaxLeasesForWorker` configuration was not being propagated to `StreamsLeaseTaker`.
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<artifactId>dynamodb-streams-kinesis-adapter</artifactId>
<packaging>jar</packaging>
<name>DynamoDB Streams Adapter for Java</name>
<version>1.5.1</version>
<version>1.5.2</version>
<description>The DynamoDB Streams Adapter implements the AmazonKinesis interface so that your application can use KCL to consume and process data from a DynamoDB stream.</description>
<url>https://aws.amazon.com/dynamodb</url>

Expand Down Expand Up @@ -62,7 +62,7 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10.3</version>
<version>2.9.10.5</version>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
config.getDynamoDBClientConfiguration(),
config.getDynamoDBEndpoint(),
config.getRegionName());
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient);
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode());
return new Worker
.Builder()
.recordProcessorFactory(recordProcessorFactory)
Expand All @@ -77,7 +77,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
*/
public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
AmazonDynamoDBStreamsAdapterClient streamsClient, AmazonDynamoDB dynamoDBClient, AmazonCloudWatch cloudWatchClient) {
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient);
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode());
return new Worker
.Builder()
.recordProcessorFactory(recordProcessorFactory)
Expand Down Expand Up @@ -107,7 +107,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
*/
public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
AmazonDynamoDBStreamsAdapterClient streamsClient, AmazonDynamoDB dynamoDBClient, AmazonCloudWatch cloudWatchClient, ExecutorService execService) {
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient);
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode());
return new Worker
.Builder()
.recordProcessorFactory(recordProcessorFactory)
Expand Down Expand Up @@ -138,7 +138,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
*/
public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
AmazonDynamoDBStreamsAdapterClient streamsClient, AmazonDynamoDB dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) {
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient);
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode());
return new Worker
.Builder()
.recordProcessorFactory(recordProcessorFactory)
Expand Down Expand Up @@ -167,7 +167,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
*/
public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
AmazonDynamoDBStreamsAdapterClient streamsClient, AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient) {
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient);
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode());
return new Worker
.Builder()
.recordProcessorFactory(recordProcessorFactory)
Expand Down Expand Up @@ -197,7 +197,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
*/
public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
AmazonDynamoDBStreamsAdapterClient streamsClient, AmazonDynamoDBClient dynamoDBClient, AmazonCloudWatchClient cloudWatchClient, ExecutorService execService) {
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient);
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode());
return new Worker
.Builder()
.recordProcessorFactory(recordProcessorFactory)
Expand Down Expand Up @@ -228,7 +228,7 @@ public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordP
*/
public static Worker createDynamoDbStreamsWorker(IRecordProcessorFactory recordProcessorFactory, KinesisClientLibConfiguration config,
AmazonDynamoDBStreamsAdapterClient streamsClient, AmazonDynamoDBClient dynamoDBClient, IMetricsFactory metricsFactory, ExecutorService execService) {
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient);
KinesisClientLeaseManager kinesisClientLeaseManager = new KinesisClientLeaseManager(config.getTableName(), dynamoDBClient, config.getBillingMode());
return new Worker
.Builder()
.recordProcessorFactory(recordProcessorFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.amazonaws.services.dynamodbv2.model.BillingMode;
import com.amazonaws.services.dynamodbv2.model.BillingModeSummary;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.junit.Test;
Expand Down Expand Up @@ -79,4 +81,29 @@ public void numProcessRecordsCallsTest() throws Exception {
// Atleast 1 and atmost 2 getRecords/processRecords calls should have been made
assertTrue(numGetRecordsCalls > 0 && numGetRecordsCalls <= 3);
}

/**
* This test configures the worker with a non-default billing mode and ensures that the billing mode is passed
* through to the created lease table.
*/
@Test
public void billingModeTest() throws Exception {
KinesisClientLibConfiguration workerConfig =
new KinesisClientLibConfiguration(leaseTable, streamId, credentials, KCL_WORKER_ID)
.withBillingMode(BillingMode.PAY_PER_REQUEST);

startKCLWorker(workerConfig);

while (recordProcessorFactory.getNumRecordsProcessed() < 0) {
LOG.info("Sleep till RecordProcessor is initialized");
Thread.sleep(THREAD_SLEEP_2S);
}

shutDownKCLWorker();

DescribeTableResult describeTableResult = TestUtil.describeTable(dynamoDBClient, leaseTable);
TableDescription leaseTableDescription = describeTableResult.getTable();
BillingModeSummary billingModeSummary = leaseTableDescription.getBillingModeSummary();
assertEquals(BillingMode.PAY_PER_REQUEST.toString(), billingModeSummary.getBillingMode());
}
}

0 comments on commit 5075014

Please sign in to comment.