Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove Guava dependency, use CompletableFuture #502

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@

# VS Code
.vscode/
.idea/
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ ClickEvent event = inputQueue.take();
}
recordsPut.getAndIncrement();

ListenableFuture<UserRecordResult> f =
CompletableFuture<UserRecordResult> f =
kpl.addUserRecord(STREAM_NAME, partitionKey, data);
Futures.addCallback(f, new FutureCallback<UserRecordResult>() {
f.whenCompleteAsync(f, new BiConusmer<UserRecordResult, Throwable>() {
...
...
````
Expand Down
4 changes: 2 additions & 2 deletions java/amazon-kinesis-producer-sample/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@
package com.amazonaws.services.kinesis.producer.sample;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

import com.google.common.collect.Iterables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,10 +32,6 @@
import com.amazonaws.services.kinesis.producer.Metric;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

/**
* If you haven't looked at {@link SampleProducer}, do so first.
Expand Down Expand Up @@ -77,9 +75,17 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc
final KinesisProducer kinesisProducer = new KinesisProducer(config);

// Result handler
final FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>() {
final BiConsumer<UserRecordResult, Throwable> callback = new BiConsumer<UserRecordResult, Throwable>() {
@Override
public void onFailure(Throwable t) {
public void accept(UserRecordResult userRecordResult, Throwable throwable) {
if(userRecordResult != null) {
onSuccess(userRecordResult);
} else {
onFailure(throwable);
}
}

void onFailure(Throwable t) {
if (t instanceof UserRecordFailedException) {
Attempt last = Iterables.getLast(
((UserRecordFailedException) t).getResult().getAttempts());
Expand All @@ -91,8 +97,7 @@ public void onFailure(Throwable t) {
System.exit(1);
}

@Override
public void onSuccess(UserRecordResult result) {
void onSuccess(UserRecordResult result) {
completed.getAndIncrement();
}
};
Expand Down Expand Up @@ -159,9 +164,9 @@ public void run() {
if (sequenceNumber.get() < totalRecordsToPut) {
if (kinesisProducer.getOutstandingRecordsCount() < outstandingLimit) {
ByteBuffer data = Utils.generateData(sequenceNumber.incrementAndGet(), dataSize);
ListenableFuture<UserRecordResult> f = kinesisProducer.addUserRecord(SampleProducerConfig.STREAM_NAME_DEFAULT,
CompletableFuture<UserRecordResult> f = kinesisProducer.addUserRecord(SampleProducerConfig.STREAM_NAME_DEFAULT,
timetstamp, data);
Futures.addCallback(f, callback, Executors.newSingleThreadExecutor());
f.whenCompleteAsync(callback, Executors.newSingleThreadExecutor());
} else {
Thread.sleep(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,18 @@
package com.amazonaws.services.kinesis.producer.sample;

import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

import com.amazonaws.services.kinesis.producer.UnexpectedMessageException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.kinesis.producer.Attempt;
import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration;
import com.amazonaws.services.kinesis.producer.KinesisProducer;
import com.amazonaws.services.kinesis.producer.UserRecordFailedException;
import com.amazonaws.services.kinesis.producer.UserRecordResult;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;

/**
* The Kinesis Producer Library (KPL) excels at handling large numbers of small
Expand Down Expand Up @@ -94,9 +87,18 @@ public static void main(String[] args) throws Exception {
final AtomicLong completed = new AtomicLong(0);

// KinesisProducer.addUserRecord is asynchronous. A callback can be used to receive the results.
final FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>() {
final BiConsumer<UserRecordResult, Throwable> callback = new BiConsumer<UserRecordResult, Throwable>() {

@Override
public void onFailure(Throwable t) {
public void accept(UserRecordResult userRecordResult, Throwable throwable) {
if(userRecordResult != null) {
onSuccess(userRecordResult);
} else {
onFailure(throwable);
}
}

void onFailure(Throwable t) {
// If we see any failures, we will log them.
if (t instanceof UserRecordFailedException) {
int attempts = ((UserRecordFailedException) t).getResult().getAttempts().size()-1;
Expand All @@ -119,8 +121,7 @@ public void onFailure(Throwable t) {
log.error("Exception during put", t);
}

@Override
public void onSuccess(UserRecordResult result) {
void onSuccess(UserRecordResult result) {
completed.getAndIncrement();
}
};
Expand All @@ -133,9 +134,9 @@ public void onSuccess(UserRecordResult result) {
public void run() {
ByteBuffer data = Utils.generateData(sequenceNumber.get(), config.getDataSize());
// TIMESTAMP is our partition key
ListenableFuture<UserRecordResult> f =
CompletableFuture<UserRecordResult> f =
producer.addUserRecord(config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data);
Futures.addCallback(f, callback, callbackThreadPool);
f.whenCompleteAsync(callback, callbackThreadPool);
}
};

Expand Down
5 changes: 0 additions & 5 deletions java/amazon-kinesis-producer/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,6 @@
</properties>

<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import com.google.common.util.concurrent.ListenableFuture;
import com.amazonaws.services.schemaregistry.common.Schema;


public interface IKinesisProducer {
ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, ByteBuffer data);
CompletableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, ByteBuffer data);

ListenableFuture<UserRecordResult> addUserRecord(UserRecord userRecord);
CompletableFuture<UserRecordResult> addUserRecord(UserRecord userRecord);

ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data);
CompletableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data);

ListenableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema);
CompletableFuture<UserRecordResult> addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema);

int getOutstandingRecordsCount();

Expand Down
Loading