Skip to content

Commit c35e7a5

Browse files
authored
Table whitelist config parameter (#7)
integration tests - updated kinesis adapter - service endpoints config - fixed credential config types updated kinesis-adapter use gson only integration test task test switch from init sync table whitelist config parameter kcl.table.billing.mode config parameter add aws-java-sdk-sts dependency #5 params cleaning cleanup docs
1 parent d319c32 commit c35e7a5

24 files changed

+953
-134
lines changed

Diff for: README.md

+5-3
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ A [Kafka Connector](http://kafka.apache.org/documentation.html#connect) which im
77
## Notable features
88
* `autodiscovery` - monitors and automatically discovers DynamoDB tables to start/stop syncing from (based on AWS TAG's)
99
* `initial sync` - automatically detects and if needed performs initial(existing) data replication before tracking changes from the DynamoDB table stream
10-
10+
* `local debugging` - use of test containers to test full connector life-cycle
1111
## Alternatives
1212

1313
Prior our development we found only one existing implementation by [shikhar](https://github.com/shikhar/kafka-connect-dynamodb), but it seems to be missing major features (initial sync, handling shard changes) and is no longer supported.
@@ -22,7 +22,7 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream
2222
* Gradlew 5.3.1
2323
* Kafka Connect Framework >= 2.1.1
2424
* Amazon Kinesis Client 1.9.1
25-
* DynamoDB Streams Kinesis Adapter 1.4.0
25+
* DynamoDB Streams Kinesis Adapter 1.5.2
2626

2727
## Documentation
2828
* [Getting started](docs/getting-started.md)
@@ -97,6 +97,9 @@ In our implementation we opted to use Amazon Kinesis Client with DynamoDB Stream
9797
# build & run unit tests
9898
./gradlew
9999

100+
# run integration tests
101+
./gradlew integrationTests
102+
100103
# build final jar
101104
./gradlew shadowJar
102105
```
@@ -128,7 +131,6 @@ Releases are done by creating new release(aka tag) via Github user interface. On
128131

129132
## Roadmap (TODO: move to issues?)
130133

131-
* Add Confluent stack as docker-compose.yml for easier local debugging
132134
* Use multithreaded DynamoDB table scan for faster `INIT SYNC`
133135

134136

Diff for: build.gradle

+32-2
Original file line numberDiff line numberDiff line change
@@ -45,15 +45,27 @@ allprojects {
4545

4646

4747
dependencies {
48-
testImplementation('org.junit.jupiter:junit-jupiter:5.4.1')
49-
testCompile ("org.junit.jupiter:junit-jupiter-params:5.3.2")
48+
def junitJupiterVersion = '5.6.2'
49+
testImplementation "org.junit.jupiter:junit-jupiter:$junitJupiterVersion"
50+
testCompile "org.junit.jupiter:junit-jupiter-api:$junitJupiterVersion"
51+
testCompile "org.junit.jupiter:junit-jupiter-params:$junitJupiterVersion"
52+
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:$junitJupiterVersion"
53+
implementation 'io.rest-assured:rest-assured:4.3.1'
54+
testCompile "org.testcontainers:testcontainers:1.14.3"
55+
testCompile "org.testcontainers:junit-jupiter:1.14.3"
56+
testCompile "org.testcontainers:kafka:1.15.0-rc2"
57+
testCompile "org.testcontainers:mockserver:1.15.0-rc2"
58+
testCompile "org.mock-server:mockserver-client-java:5.11.1"
59+
testCompile "com.google.code.gson:gson:2.8.6"
60+
5061
testCompile group: 'org.mockito', name: 'mockito-junit-jupiter', version: '2.26.0'
5162
compile group: 'org.apache.logging.log4j', name: 'log4j-api', version: '2.11.2'
5263
compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.11.2'
5364
compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: '2.11.2'
5465
}
5566

5667
test {
68+
exclude '**/**IT**'
5769
useJUnitPlatform()
5870
testLogging {
5971
outputs.upToDateWhen {false}
@@ -66,8 +78,26 @@ allprojects {
6678
}
6779
}
6880
}
81+
82+
task integrationTests(type: Test) {
83+
dependsOn shadowJar
84+
useJUnitPlatform()
85+
include '**/**IT**'
86+
testLogging {
87+
outputs.upToDateWhen {false}
88+
events = ["passed", "failed", "skipped"]
89+
showStandardStreams = true
90+
afterSuite { desc, result ->
91+
if (!desc.parent) { // will match the outermost suite
92+
println "Results: ${result.resultType} (${result.testCount} tests, ${result.successfulTestCount} successes, ${result.failedTestCount} failures, ${result.skippedTestCount} skipped)"
93+
}
94+
}
95+
}
96+
}
6997
}
7098

99+
100+
71101
dependencies {
72102
compile project(':source')
73103
}

Diff for: docs/details.md

+2-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ This connector can sync multiple DynamoDB tables at the same time and it does so
88
* environment TAG key and value set
99
* DynamoDB streams enabled (in `new_image` or `new_and_old_image` mode)
1010

11-
11+
> Note: if `dynamodb.table.whitelist` parameter is set, then auto-discovery will not be executed and replication will be issued for explicitly defined tables.
1212
### 2. "INIT_SYNC"
1313

1414
`INIT_SYNC` is a process when all existing table data is scanned and pushed into Kafka destination topic. Usually this happens only once after source task for specific table is started for the first time. But it can be repeated in case of unexpected issues, e.g. if source connector was down for long period of time and it is possible that it has missed some of the change events from the table stream (DynamoDB streams store data for 24 hours only).
@@ -40,7 +40,7 @@ Since we are using two different frameworks/libraries together there are two dif
4040
4141
### `DISCOVERY` state and task configuration
4242

43-
Connector uses AWS resource group API to receive a list of DynamoDB tables which have ingestion TAG defined. Then it iterates over this list and checks if environment TAG is matched and streams are actually enabled. Connect task is started for each table which meats all requirements.
43+
If `dynamodb.table.whitelist` parameter is not defined connector uses AWS resource group API to receive a list of DynamoDB tables which have ingestion TAG defined. Then it iterates over this list and checks if environment TAG is matched and streams are actually enabled. Connect task is started for each table which meats all requirements.
4444

4545
`discovery` phase is executed on start and every 60 seconds(default config value) after initial start.
4646

Diff for: docs/options.md

+11-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,13 @@
2424
"dynamodb.table.env.tag.key": "environment",
2525
"dynamodb.table.env.tag.value": "dev",
2626
"dynamodb.table.ingestion.tag.key": "datalake-ingest",
27+
"dynamodb.table.whitelist": "",
28+
"dynamodb.service.endpoint": "",
2729

30+
"kcl.table.billing.mode": "PROVISIONED",
31+
32+
"resource.tagging.service.endpoint": "",
33+
2834
"kafka.topic.prefix": "dynamodb-",
2935
"tasks.max": "1",
3036

@@ -44,8 +50,12 @@
4450

4551
`init.sync.delay.period` - time interval in seconds. Defines how long `INIT_SYNC` should delay execution before starting. This is used to give time for Kafka Connect tasks to calm down after rebalance (Since multiple tasks rebalances can happen in quick succession and this would mean more duplicated data since `INIT_SYNC` process won't have time mark it's progress).
4652

47-
`connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured.
53+
`connect.dynamodb.rediscovery.period` - time interval in milliseconds. Defines how often connector should try to find new DynamoDB tables (or detect removed ones). If changes are found tasks are automatically reconfigured.
4854

55+
`dynamodb.service.endpoint` - AWS DynamoDB API Endpoint. Will use default AWS if not set.
4956

57+
`resource.tagging.service.endpoint` - AWS Resource Group Tag API Endpoint. Will use default AWS if not set.
5058

59+
`kcl.table.billing.mode` - Define billing mode for internal table created by the KCL library. Default is provisioned.
5160

61+
`dynamodb.table.whitelist` - Define whitelist of dynamodb table names. This overrides table auto-discovery by ingestion tag.

Diff for: source/build.gradle

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@ dependencies {
66

77
compile group: 'org.apache.kafka', name: 'connect-api', version: "${rootProject.ext.kafkaConnectApiVersion}"
88
compile group: 'com.amazonaws', name: 'amazon-kinesis-client', version: '1.9.1'
9-
compile group: 'com.amazonaws', name: 'dynamodb-streams-kinesis-adapter', version: '1.4.0'
9+
compile group: 'com.amazonaws', name: 'dynamodb-streams-kinesis-adapter', version: '1.5.2'
10+
compile group: 'com.amazonaws', name: 'aws-java-sdk-sts', version: '1.11.877'
1011
}

Diff for: source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnector.java

+18-11
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@
22

33
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
44
import com.amazonaws.services.resourcegroupstaggingapi.AWSResourceGroupsTaggingAPI;
5-
import com.trustpilot.connector.dynamodb.aws.DynamoDBTablesProvider;
65
import com.trustpilot.connector.dynamodb.aws.AwsClients;
6+
import com.trustpilot.connector.dynamodb.aws.ConfigTablesProvider;
7+
import com.trustpilot.connector.dynamodb.aws.DynamoDBTablesProvider;
78
import com.trustpilot.connector.dynamodb.aws.TablesProvider;
89
import org.apache.kafka.common.config.ConfigDef;
910
import org.apache.kafka.connect.connector.ConnectorContext;
@@ -54,20 +55,26 @@ public void start(Map<String, String> properties) {
5455

5556
AWSResourceGroupsTaggingAPI groupsTaggingAPIClient =
5657
AwsClients.buildAWSResourceGroupsTaggingAPIClient(config.getAwsRegion(),
57-
config.getAwsAccessKeyId(),
58-
config.getAwsSecretKey());
58+
config.getResourceTaggingServiceEndpoint(),
59+
config.getAwsAccessKeyIdValue(),
60+
config.getAwsSecretKeyValue());
5961

6062
AmazonDynamoDB dynamoDBClient = AwsClients.buildDynamoDbClient(config.getAwsRegion(),
61-
config.getAwsAccessKeyId(),
62-
config.getAwsSecretKey());
63+
config.getDynamoDBServiceEndpoint(),
64+
config.getAwsAccessKeyIdValue(),
65+
config.getAwsSecretKeyValue());
6366

6467
if (tablesProvider == null) {
65-
tablesProvider = new DynamoDBTablesProvider(
66-
groupsTaggingAPIClient,
67-
dynamoDBClient,
68-
config.getSrcDynamoDBIngestionTagKey(),
69-
config.getSrcDynamoDBEnvTagKey(),
70-
config.getSrcDynamoDBEnvTagValue());
68+
if (config.getWhitelistTables() != null) {
69+
tablesProvider = new ConfigTablesProvider(dynamoDBClient, config);
70+
} else {
71+
tablesProvider = new DynamoDBTablesProvider(
72+
groupsTaggingAPIClient,
73+
dynamoDBClient,
74+
config.getSrcDynamoDBIngestionTagKey(),
75+
config.getSrcDynamoDBEnvTagKey(),
76+
config.getSrcDynamoDBEnvTagValue());
77+
}
7178
}
7279

7380
startBackgroundReconfigurationTasks(this.context, config.getRediscoveryPeriod());

Diff for: source/src/main/java/com/trustpilot/connector/dynamodb/DynamoDBSourceConnectorConfig.java

+90-7
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package com.trustpilot.connector.dynamodb;
22

3+
import com.amazonaws.services.dynamodbv2.model.BillingMode;
34
import org.apache.kafka.common.config.AbstractConfig;
45
import org.apache.kafka.common.config.ConfigDef;
6+
import org.apache.kafka.common.config.types.Password;
57

8+
import java.util.List;
69
import java.util.Map;
710

811
public class DynamoDBSourceConnectorConfig extends AbstractConfig {
@@ -22,12 +25,12 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
2225
public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id";
2326
public static final String AWS_ACCESS_KEY_ID_DOC = "Explicit AWS access key ID. Leave empty to utilize the default credential provider chain.";
2427
public static final String AWS_ACCESS_KEY_ID_DISPLAY = "Access key id";
25-
public static final Object AWS_ACCESS_KEY_ID_DEFAULT = null;
28+
public static final Password AWS_ACCESS_KEY_ID_DEFAULT = null;
2629

2730
public static final String AWS_SECRET_KEY_CONFIG = "aws.secret.key";
2831
public static final String AWS_SECRET_KEY_DOC = "Explicit AWS secret access key. Leave empty to utilize the default credential provider chain.";
2932
public static final String AWS_SECRET_KEY_DISPLAY = "Secret key";
30-
public static final Object AWS_SECRET_KEY_DEFAULT = null;
33+
public static final Password AWS_SECRET_KEY_DEFAULT = null;
3134

3235
public static final String SRC_DYNAMODB_TABLE_INGESTION_TAG_KEY_CONFIG = "dynamodb.table.ingestion.tag.key";
3336
public static final String SRC_DYNAMODB_TABLE_INGESTION_TAG_KEY_DOC = "Define DynamoDB table tag name. Only tables with this tag key will be ingested.";
@@ -44,6 +47,16 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
4447
public static final String SRC_DYNAMODB_TABLE_ENV_TAG_VALUE_DISPLAY = "Environment";
4548
public static final String SRC_DYNAMODB_TABLE_ENV_TAG_VALUE_DEFAULT = "dev";
4649

50+
public static final String SRC_DYNAMODB_TABLE_WHITELIST_CONFIG = "dynamodb.table.whitelist";
51+
public static final String SRC_DYNAMODB_TABLE_WHITELIST_DOC = "Define whitelist of dynamodb table names. This overrides table auto-discovery by ingestion tag.";
52+
public static final String SRC_DYNAMODB_TABLE_WHITELIST_DISPLAY = "Tables whitelist";
53+
public static final String SRC_DYNAMODB_TABLE_WHITELIST_DEFAULT = null;
54+
55+
public static final String SRC_KCL_TABLE_BILLING_MODE_CONFIG = "kcl.table.billing.mode";
56+
public static final String SRC_KCL_TABLE_BILLING_MODE_DOC = "Define billing mode for internal table created by the KCL library. Default is provisioned.";
57+
public static final String SRC_KCL_TABLE_BILLING_MODE_DISPLAY = "KCL table billing mode";
58+
public static final String SRC_KCL_TABLE_BILLING_MODE_DEFAULT = "PROVISIONED";
59+
4760
public static final String DST_TOPIC_PREFIX_CONFIG = "kafka.topic.prefix";
4861
public static final String DST_TOPIC_PREFIX_DOC = "Define Kafka topic destination prefix. End will be the name of a table.";
4962
public static final String DST_TOPIC_PREFIX_DISPLAY = "Topic prefix";
@@ -55,6 +68,16 @@ public class DynamoDBSourceConnectorConfig extends AbstractConfig {
5568
public static final String REDISCOVERY_PERIOD_DISPLAY = "Rediscovery period";
5669
public static final long REDISCOVERY_PERIOD_DEFAULT = 1 * 60 * 1000; // 1 minute
5770

71+
public static final String AWS_RESOURCE_TAGGING_API_ENDPOINT_CONFIG = "resource.tagging.service.endpoint";
72+
public static final String AWS_RESOURCE_TAGGING_API_ENDPOINT_DOC = "AWS Resource Group Tag API Endpoint. Will use default AWS if not set.";
73+
public static final String AWS_RESOURCE_TAGGING_API_ENDPOINT_DISPLAY = "AWS Resource Group Tag API Endpoint";
74+
public static final String AWS_RESOURCE_TAGGING_API_ENDPOINT_DEFAULT = null;
75+
76+
public static final String AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG = "dynamodb.service.endpoint";
77+
public static final String AWS_DYNAMODB_SERVICE_ENDPOINT_DOC = "AWS DynamoDB API Endpoint. Will use default AWS if not set.";
78+
public static final String AWS_DYNAMODB_SERVICE_ENDPOINT_DISPLAY = "AWS DynamoDB API Endpoint";
79+
public static final String AWS_DYNAMODB_SERVICE_ENDPOINT_DEFAULT = null;
80+
5881
static final ConfigDef config = baseConfigDef();
5982

6083
public DynamoDBSourceConnectorConfig(Map<String, String> props) {
@@ -122,6 +145,42 @@ public static ConfigDef baseConfigDef() {
122145
ConfigDef.Width.MEDIUM,
123146
SRC_DYNAMODB_TABLE_ENV_TAG_VALUE_DISPLAY)
124147

148+
.define(AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG,
149+
ConfigDef.Type.STRING,
150+
AWS_DYNAMODB_SERVICE_ENDPOINT_DEFAULT,
151+
ConfigDef.Importance.LOW,
152+
AWS_DYNAMODB_SERVICE_ENDPOINT_DOC,
153+
AWS_GROUP, 7,
154+
ConfigDef.Width.MEDIUM,
155+
AWS_DYNAMODB_SERVICE_ENDPOINT_DISPLAY)
156+
157+
.define(AWS_RESOURCE_TAGGING_API_ENDPOINT_CONFIG,
158+
ConfigDef.Type.STRING,
159+
AWS_RESOURCE_TAGGING_API_ENDPOINT_DEFAULT,
160+
ConfigDef.Importance.LOW,
161+
AWS_RESOURCE_TAGGING_API_ENDPOINT_DOC,
162+
AWS_GROUP, 8,
163+
ConfigDef.Width.MEDIUM,
164+
AWS_RESOURCE_TAGGING_API_ENDPOINT_DISPLAY)
165+
166+
.define(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG,
167+
ConfigDef.Type.LIST,
168+
SRC_DYNAMODB_TABLE_WHITELIST_DEFAULT,
169+
ConfigDef.Importance.LOW,
170+
SRC_DYNAMODB_TABLE_WHITELIST_DOC,
171+
AWS_GROUP, 8,
172+
ConfigDef.Width.MEDIUM,
173+
SRC_DYNAMODB_TABLE_WHITELIST_DISPLAY)
174+
175+
.define(SRC_KCL_TABLE_BILLING_MODE_CONFIG,
176+
ConfigDef.Type.STRING,
177+
SRC_KCL_TABLE_BILLING_MODE_DEFAULT,
178+
ConfigDef.Importance.LOW,
179+
SRC_KCL_TABLE_BILLING_MODE_DOC,
180+
AWS_GROUP, 9,
181+
ConfigDef.Width.MEDIUM,
182+
SRC_KCL_TABLE_BILLING_MODE_DISPLAY)
183+
125184
.define(DST_TOPIC_PREFIX_CONFIG,
126185
ConfigDef.Type.STRING,
127186
DST_TOPIC_PREFIX_DEFAULT,
@@ -148,8 +207,8 @@ public static ConfigDef baseConfigDef() {
148207
CONNECTOR_GROUP, 4,
149208
ConfigDef.Width.MEDIUM,
150209
REDISCOVERY_PERIOD_DISPLAY)
151-
152210
;
211+
153212
}
154213

155214
public static void main(String[] args) {
@@ -160,12 +219,20 @@ public String getAwsRegion() {
160219
return getString(AWS_REGION_CONFIG);
161220
}
162221

163-
public String getAwsAccessKeyId() {
164-
return getString(AWS_ACCESS_KEY_ID_CONFIG);
222+
public Password getAwsAccessKeyId() {
223+
return getPassword(AWS_ACCESS_KEY_ID_CONFIG);
224+
}
225+
226+
public String getAwsAccessKeyIdValue() {
227+
return getPassword(AWS_ACCESS_KEY_ID_CONFIG) == null ? null : getPassword(AWS_ACCESS_KEY_ID_CONFIG).value();
228+
}
229+
230+
public Password getAwsSecretKey() {
231+
return getPassword(AWS_SECRET_KEY_CONFIG);
165232
}
166233

167-
public String getAwsSecretKey() {
168-
return getString(AWS_SECRET_KEY_CONFIG);
234+
public String getAwsSecretKeyValue() {
235+
return getPassword(AWS_SECRET_KEY_CONFIG) == null ? null : getPassword(AWS_SECRET_KEY_CONFIG).value();
169236
}
170237

171238
public String getSrcDynamoDBIngestionTagKey() {
@@ -189,4 +256,20 @@ public long getRediscoveryPeriod() {
189256
public int getInitSyncDelay() {
190257
return (int)get(SRC_INIT_SYNC_DELAY_CONFIG);
191258
}
259+
260+
public String getDynamoDBServiceEndpoint() {
261+
return getString(AWS_DYNAMODB_SERVICE_ENDPOINT_CONFIG);
262+
}
263+
264+
public String getResourceTaggingServiceEndpoint() {
265+
return getString(AWS_RESOURCE_TAGGING_API_ENDPOINT_CONFIG);
266+
}
267+
268+
public List<String> getWhitelistTables() {
269+
return getList(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG) != null ? getList(SRC_DYNAMODB_TABLE_WHITELIST_CONFIG) : null;
270+
}
271+
272+
public BillingMode getKCLTableBillingMode() {
273+
return BillingMode.fromValue(getString(SRC_KCL_TABLE_BILLING_MODE_CONFIG));
274+
}
192275
}

0 commit comments

Comments
 (0)