Skip to content

Commit

Permalink
Implementing Efficient and Reliable Producers for AWS Kinesis with th…
Browse files Browse the repository at this point in the history
…e Kinesis Producer Library
  • Loading branch information
kevincdeng committed Aug 3, 2015
1 parent 9dd2da2 commit 6f7cbbd
Show file tree
Hide file tree
Showing 15 changed files with 769 additions and 0 deletions.
1 change: 1 addition & 0 deletions aws-blog-kinesis-producer-library/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
.classpath
45 changes: 45 additions & 0 deletions aws-blog-kinesis-producer-library/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#Implementing Efficient and Reliable Producers for AWS Kinesis with the Kinesis Producer Library

## Overview

This repository contains the code samples used in the AWS Big Data Blog post of the same name (TODO link).

The code contains a single java package with various implementations of the base class `AbstractClickEventsToKinesis` as described in the blog post. There is a driver class called `ClickEventsToKinesisTestDriver` that can be executed to test out the implementations.

## Running the Sample

You'll need to first create a stream to put data into. This stream should have a sufficient number of shards if you wish to try out the high-throughput implementations.

You can use any stream name and region, but you'll have to modify `AbstractClickEventsToKinesis` to use the name and region you have chosen.

To execute the test driver:

```
mvn compile exec:java -Dexec.mainClass=com.amazonaws.services.kinesis.producer.demo.ClickEventsToKinesisTestDriver
```

You should see output that looks like this:

```
[INFO] --- exec-maven-plugin:1.4.0:java (default-cli) @ aws-blog-kinesis-producer-library ---
0 seconds, 0 records total, 0.00 RPS (avg last 10s)
1 seconds, 0 records total, 0.00 RPS (avg last 10s)
2 seconds, 4 records total, 0.40 RPS (avg last 10s)
3 seconds, 7 records total, 0.70 RPS (avg last 10s)
4 seconds, 12 records total, 1.20 RPS (avg last 10s)
5 seconds, 16 records total, 1.60 RPS (avg last 10s)
6 seconds, 22 records total, 2.20 RPS (avg last 10s)
7 seconds, 28 records total, 2.80 RPS (avg last 10s)
8 seconds, 31 records total, 3.10 RPS (avg last 10s)
9 seconds, 36 records total, 3.60 RPS (avg last 10s)
10 seconds, 46 records total, 4.60 RPS (avg last 10s)
11 seconds, 55 records total, 5.50 RPS (avg last 10s)
...
```

To try out a different implementation of the producer, find and change the following line in `ClickEventsToKinesisTestDriver`:

```java
// Change this line to use a different implementation
final AbstractClickEventsToKinesis worker = new BasicClickEventsToKinesis(events);
```
38 changes: 38 additions & 0 deletions aws-blog-kinesis-producer-library/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.amazonaws</groupId>
<artifactId>aws-blog-kinesis-producer-library</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.10.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.9.37</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudwatch</artifactId>
<version>1.9.37</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package com.amazonaws.services.kinesis.producer.demo;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;

public abstract class AbstractClickEventsToKinesis implements Runnable {
protected final static String STREAM_NAME = "myStream";
protected final static String REGION = "us-east-1";

protected final BlockingQueue<ClickEvent> inputQueue;
protected volatile boolean shutdown = false;
protected final AtomicLong recordsPut = new AtomicLong(0);

protected AbstractClickEventsToKinesis(BlockingQueue<ClickEvent> inputQueue) {
this.inputQueue = inputQueue;
}

@Override
public void run() {
while (!shutdown) {
try {
runOnce();
} catch (Exception e) {
e.printStackTrace();
}
}
}

public long recordsPut() {
return recordsPut.get();
}

public void stop() {
shutdown = true;
}

protected abstract void runOnce() throws Exception;
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package com.amazonaws.services.kinesis.producer.demo;

import java.nio.ByteBuffer;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.Metric;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

public class AdvancedKPLClickEventsToKinesis extends AbstractClickEventsToKinesis {
private static final Random RANDOM = new Random();
private static final Log log = LogFactory.getLog(AdvancedKPLClickEventsToKinesis.class);

private final KinesisProducer kinesis;

protected AdvancedKPLClickEventsToKinesis(BlockingQueue<ClickEvent> inputQueue) {
super(inputQueue);
kinesis = new KinesisProducer(new KinesisProducerConfiguration()
.setRegion(REGION));
}

@Override
protected void runOnce() throws Exception {
ClickEvent event = inputQueue.take();
String partitionKey = event.getSessionId();
String payload = event.getPayload();
ByteBuffer data = ByteBuffer.wrap(payload.getBytes("UTF-8"));
while (kinesis.getOutstandingRecordsCount() > 1e4) {
Thread.sleep(1);
}
recordsPut.getAndIncrement();

ListenableFuture<UserRecordResult> f =
kinesis.addUserRecord(STREAM_NAME, partitionKey, data);
Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
@Override
public void onSuccess(UserRecordResult result) {
long totalTime = result.getAttempts().stream()
.mapToLong(a -> a.getDelay() + a.getDuration())
.sum();
// Only log with a small probability, otherwise it'll be very spammy
if (RANDOM.nextDouble() < 1e-5) {
log.info(String.format(
"Succesfully put record, partitionKey=%s, payload=%s, " +
"sequenceNumber=%s, shardId=%s, took %d attempts, totalling %s ms",
partitionKey, payload, result.getSequenceNumber(),
result.getShardId(), result.getAttempts().size(), totalTime));
}
}

@Override
public void onFailure(Throwable t) {
if (t instanceof UserRecordFailedException) {
UserRecordFailedException e = (UserRecordFailedException) t;
UserRecordResult result = e.getResult();

String errorList =
StringUtils.join(result.getAttempts().stream()
.map(a -> String.format(
"Delay after prev attempt: %d ms, Duration: %d ms, Code: %s, Message: %s",
a.getDelay(), a.getDuration(), a.getErrorCode(), a.getErrorMessage()))
.collect(Collectors.toList()), "\n");

log.error(String.format(
"Record failed to put, partitionKey=%s, payload=%s, attempts:\n%s",
partitionKey, payload, errorList));
}
};
});
}

@Override
public long recordsPut() {
try {
return kinesis.getMetrics("UserRecordsPut").stream()
.filter(m -> m.getDimensions().size() == 2)
.findFirst()
.map(Metric::getSum)
.orElse(0.0)
.longValue();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void stop() {
super.stop();
kinesis.flushSync();
kinesis.destroy();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package com.amazonaws.services.kinesis.producer.demo;

import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;

public class AggregatingClickEventsToKinesis extends AbstractClickEventsToKinesis {
private final AmazonKinesis kinesis;

private String partitionKey;
private String payload;
private int numAggregated;

public AggregatingClickEventsToKinesis(BlockingQueue<ClickEvent> inputQueue) {
super(inputQueue);
kinesis = new AmazonKinesisClient().withRegion(Regions.fromName(REGION));
reset();
}

@Override
protected void runOnce() throws Exception {
ClickEvent event = inputQueue.take();
String partitionKey = event.getSessionId();
String payload = event.getPayload();

if (this.partitionKey == null) {
this.partitionKey = partitionKey;
}

if (this.payload == null) {
this.payload = payload;
} else {
this.payload += "\r\n" + payload;
}
this.numAggregated++;

if (this.payload.length() >= 1024) {
kinesis.putRecord(
STREAM_NAME,
ByteBuffer.wrap(this.payload.getBytes("UTF-8")),
this.partitionKey);
recordsPut.getAndAdd(numAggregated);
reset();
}
}

private void reset() {
this.partitionKey = null;
this.payload = null;
this.numAggregated = 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.amazonaws.services.kinesis.producer.demo;

import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;

public class BasicClickEventsToKinesis extends AbstractClickEventsToKinesis {
private final AmazonKinesis kinesis;

public BasicClickEventsToKinesis(BlockingQueue<ClickEvent> inputQueue) {
super(inputQueue);
kinesis = new AmazonKinesisClient().withRegion(Regions.fromName(REGION));
}

@Override
protected void runOnce() throws Exception {
ClickEvent event = inputQueue.take();
String partitionKey = event.getSessionId();
ByteBuffer data = ByteBuffer.wrap(event.getPayload().getBytes("UTF-8"));
kinesis.putRecord(STREAM_NAME, data, partitionKey);
recordsPut.getAndIncrement();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.amazonaws.services.kinesis.producer.demo;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.model.PutRecordsRequest;
import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;

public class BatchedClickEventsToKinesis extends AbstractClickEventsToKinesis {
protected AmazonKinesis kinesis;
protected List<PutRecordsRequestEntry> entries;

private int dataSize;

public BatchedClickEventsToKinesis(BlockingQueue<ClickEvent> inputQueue) {
super(inputQueue);
kinesis = new AmazonKinesisClient().withRegion(Regions.fromName(REGION));
entries = new ArrayList<>();
dataSize = 0;
}

@Override
protected void runOnce() throws Exception {
ClickEvent event = inputQueue.take();
String partitionKey = event.getSessionId();
ByteBuffer data = ByteBuffer.wrap(event.getPayload().getBytes("UTF-8"));
recordsPut.getAndIncrement();

addEntry(new PutRecordsRequestEntry()
.withPartitionKey(partitionKey)
.withData(data));
}

@Override
public long recordsPut() {
return super.recordsPut() - entries.size();
}

@Override
public void stop() {
super.stop();
flush();
}

protected void flush() {
kinesis.putRecords(new PutRecordsRequest()
.withStreamName(STREAM_NAME)
.withRecords(entries));
entries.clear();
}

protected void addEntry(PutRecordsRequestEntry entry) {
int newDataSize = dataSize + entry.getData().remaining() +
entry.getPartitionKey().length();
if (newDataSize <= 5 * 1024 * 1024 && entries.size() < 500) {
dataSize = newDataSize;
entries.add(entry);
} else {
flush();
dataSize = 0;
addEntry(entry);
}
}
}
Loading

0 comments on commit 6f7cbbd

Please sign in to comment.