diff --git a/.gitignore b/.gitignore index db9655ae..5a0fbe95 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ # VS Code .vscode/ +.idea/ \ No newline at end of file diff --git a/README.md b/README.md index 8fc3fbee..4d802bd3 100644 --- a/README.md +++ b/README.md @@ -27,9 +27,9 @@ ClickEvent event = inputQueue.take(); } recordsPut.getAndIncrement(); - ListenableFuture f = + CompletableFuture f = kpl.addUserRecord(STREAM_NAME, partitionKey, data); - Futures.addCallback(f, new FutureCallback() { + f.whenCompleteAsync(f, new BiConusmer() { ... ... ```` diff --git a/java/amazon-kinesis-producer-sample/pom.xml b/java/amazon-kinesis-producer-sample/pom.xml index 329bc2fc..e533f354 100644 --- a/java/amazon-kinesis-producer-sample/pom.xml +++ b/java/amazon-kinesis-producer-sample/pom.xml @@ -11,8 +11,8 @@ maven-compiler-plugin 3.1 - 1.7 - 1.7 + 1.8 + 1.8 diff --git a/java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/MetricsAwareSampleProducer.java b/java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/MetricsAwareSampleProducer.java index fffde5a1..4205ea16 100644 --- a/java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/MetricsAwareSampleProducer.java +++ b/java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/MetricsAwareSampleProducer.java @@ -16,11 +16,13 @@ package com.amazonaws.services.kinesis.producer.sample; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; -import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import com.google.common.collect.Iterables; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,10 +32,6 @@ import com.amazonaws.services.kinesis.producer.Metric; import com.amazonaws.services.kinesis.producer.UserRecordFailedException; import com.amazonaws.services.kinesis.producer.UserRecordResult; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; /** * If you haven't looked at {@link SampleProducer}, do so first. @@ -77,9 +75,17 @@ public static void main(String[] args) throws InterruptedException, ExecutionExc final KinesisProducer kinesisProducer = new KinesisProducer(config); // Result handler - final FutureCallback callback = new FutureCallback() { + final BiConsumer callback = new BiConsumer() { @Override - public void onFailure(Throwable t) { + public void accept(UserRecordResult userRecordResult, Throwable throwable) { + if(userRecordResult != null) { + onSuccess(userRecordResult); + } else { + onFailure(throwable); + } + } + + void onFailure(Throwable t) { if (t instanceof UserRecordFailedException) { Attempt last = Iterables.getLast( ((UserRecordFailedException) t).getResult().getAttempts()); @@ -91,8 +97,7 @@ public void onFailure(Throwable t) { System.exit(1); } - @Override - public void onSuccess(UserRecordResult result) { + void onSuccess(UserRecordResult result) { completed.getAndIncrement(); } }; @@ -159,9 +164,9 @@ public void run() { if (sequenceNumber.get() < totalRecordsToPut) { if (kinesisProducer.getOutstandingRecordsCount() < outstandingLimit) { ByteBuffer data = Utils.generateData(sequenceNumber.incrementAndGet(), dataSize); - ListenableFuture f = kinesisProducer.addUserRecord(SampleProducerConfig.STREAM_NAME_DEFAULT, + CompletableFuture f = kinesisProducer.addUserRecord(SampleProducerConfig.STREAM_NAME_DEFAULT, timetstamp, data); - Futures.addCallback(f, callback, Executors.newSingleThreadExecutor()); + f.whenCompleteAsync(callback, Executors.newSingleThreadExecutor()); } else { Thread.sleep(1); } diff --git a/java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/SampleProducer.java b/java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/SampleProducer.java index e137732d..67e7f3da 100644 --- a/java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/SampleProducer.java +++ b/java/amazon-kinesis-producer-sample/src/com/amazonaws/services/kinesis/producer/sample/SampleProducer.java @@ -16,25 +16,18 @@ package com.amazonaws.services.kinesis.producer.sample; import java.nio.ByteBuffer; -import java.util.concurrent.Executors; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; import com.amazonaws.services.kinesis.producer.UnexpectedMessageException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.services.kinesis.producer.Attempt; -import com.amazonaws.services.kinesis.producer.KinesisProducerConfiguration; import com.amazonaws.services.kinesis.producer.KinesisProducer; import com.amazonaws.services.kinesis.producer.UserRecordFailedException; import com.amazonaws.services.kinesis.producer.UserRecordResult; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; -import com.google.common.util.concurrent.ListenableFuture; /** * The Kinesis Producer Library (KPL) excels at handling large numbers of small @@ -94,9 +87,18 @@ public static void main(String[] args) throws Exception { final AtomicLong completed = new AtomicLong(0); // KinesisProducer.addUserRecord is asynchronous. A callback can be used to receive the results. - final FutureCallback callback = new FutureCallback() { + final BiConsumer callback = new BiConsumer() { + @Override - public void onFailure(Throwable t) { + public void accept(UserRecordResult userRecordResult, Throwable throwable) { + if(userRecordResult != null) { + onSuccess(userRecordResult); + } else { + onFailure(throwable); + } + } + + void onFailure(Throwable t) { // If we see any failures, we will log them. if (t instanceof UserRecordFailedException) { int attempts = ((UserRecordFailedException) t).getResult().getAttempts().size()-1; @@ -119,8 +121,7 @@ public void onFailure(Throwable t) { log.error("Exception during put", t); } - @Override - public void onSuccess(UserRecordResult result) { + void onSuccess(UserRecordResult result) { completed.getAndIncrement(); } }; @@ -133,9 +134,9 @@ public void onSuccess(UserRecordResult result) { public void run() { ByteBuffer data = Utils.generateData(sequenceNumber.get(), config.getDataSize()); // TIMESTAMP is our partition key - ListenableFuture f = + CompletableFuture f = producer.addUserRecord(config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data); - Futures.addCallback(f, callback, callbackThreadPool); + f.whenCompleteAsync(callback, callbackThreadPool); } }; diff --git a/java/amazon-kinesis-producer/pom.xml b/java/amazon-kinesis-producer/pom.xml index 7ee1b7dd..6c2ea724 100644 --- a/java/amazon-kinesis-producer/pom.xml +++ b/java/amazon-kinesis-producer/pom.xml @@ -63,11 +63,6 @@ - - com.google.guava - guava - 31.1-jre - org.slf4j slf4j-api diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/IKinesisProducer.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/IKinesisProducer.java index 085e6774..86f97733 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/IKinesisProducer.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/IKinesisProducer.java @@ -2,19 +2,19 @@ import java.nio.ByteBuffer; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import com.google.common.util.concurrent.ListenableFuture; import com.amazonaws.services.schemaregistry.common.Schema; public interface IKinesisProducer { - ListenableFuture addUserRecord(String stream, String partitionKey, ByteBuffer data); + CompletableFuture addUserRecord(String stream, String partitionKey, ByteBuffer data); - ListenableFuture addUserRecord(UserRecord userRecord); + CompletableFuture addUserRecord(UserRecord userRecord); - ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data); + CompletableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data); - ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema); + CompletableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema); int getOutstandingRecordsCount(); diff --git a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java index aa88cc5d..af23f91a 100644 --- a/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java +++ b/java/amazon-kinesis-producer/src/main/java/com/amazonaws/services/kinesis/producer/KinesisProducer.java @@ -24,8 +24,6 @@ import com.amazonaws.services.schemaregistry.common.Schema; import com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializer; import com.google.common.collect.ImmutableMap; -import com.google.common.util.concurrent.ListenableFuture; -import com.google.common.util.concurrent.SettableFuture; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.protobuf.ByteString; import lombok.AllArgsConstructor; @@ -48,16 +46,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.FutureTask; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.PriorityBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; /** @@ -117,7 +106,7 @@ public int compare(SettableFutureTracker x, SettableFutureTracker y) @Value private static class SettableFutureTracker { @NonNull - private SettableFuture future; + private CompletableFuture future; @NonNull private Instant timestamp; @NonNull @@ -174,8 +163,8 @@ public void run() { } else { // clear the future here as well since the native core has exhausted its retries. SettableFutureTracker futureTracker = getFuture(m); - SettableFuture f = futureTracker.getFuture(); - f.setException(new UnexpectedMessageException("Unexpected message type from child process")); + CompletableFuture f = futureTracker.getFuture(); + f.completeExceptionally(new UnexpectedMessageException("Unexpected message type from child process")); log.error(String.format("Unexpected message type with case %s from child process with message" + " id %s. Removing the submitted future from processing queue.", m.getActualMessageCase(), m.getSourceId())); @@ -197,7 +186,7 @@ public void onError(final Throwable t) { callbackCompletionExecutor.execute(new Runnable() { @Override public void run() { - entry.getValue().getFuture().setException(t); + entry.getValue().getFuture().completeExceptionally(t); } }); } @@ -231,25 +220,25 @@ public void run() { */ private void onPutRecordResult(Message msg) { SettableFutureTracker futureTracker = getFuture(msg); - SettableFuture f = (SettableFuture) futureTracker.getFuture(); + CompletableFuture f = (CompletableFuture) futureTracker.getFuture(); UserRecordResult result = UserRecordResult.fromProtobufMessage(msg.getPutRecordResult()); if (result.isSuccessful()) { - f.set(result); + f.complete(result); } else { - f.setException(new UserRecordFailedException(result)); + f.completeExceptionally(new UserRecordFailedException(result)); } } private void onMetricsResponse(Message msg) { SettableFutureTracker futureTracker = getFuture(msg); - SettableFuture> f = (SettableFuture>) futureTracker.getFuture(); + CompletableFuture> f = (CompletableFuture>) futureTracker.getFuture(); List userMetrics = new ArrayList<>(); MetricsResponse res = msg.getMetricsResponse(); for (Messages.Metric metric : res.getMetricsList()) { userMetrics.add(new Metric(metric)); } - f.set(userMetrics); + f.complete(userMetrics); } private SettableFutureTracker getFuture(Message msg) { @@ -350,7 +339,7 @@ protected KinesisProducer(File inPipe, File outPipe) { } /** - * Put a record asynchronously. A {@link ListenableFuture} is returned that + * Put a record asynchronously. A {@link CompletableFuture} is returned that * can be used to retrieve the result, either by polling or by registering a * callback. * @@ -368,7 +357,7 @@ protected KinesisProducer(File inPipe, File outPipe) { * To add a listener to the future: *

* - * ListenableFuture<PutRecordResult> f = myKinesisProducer.addUserRecord(...); + * CompletableFuture<PutRecordResult> f = myKinesisProducer.addUserRecord(...); * com.google.common.util.concurrent.Futures.addCallback(f, callback, executor); * *

@@ -401,18 +390,18 @@ protected KinesisProducer(File inPipe, File outPipe) { * if input does not meet stated constraints * @throws DaemonException * if the child process is dead - * @see ListenableFuture + * @see CompletableFuture * @see UserRecordResult * @see KinesisProducerConfiguration#setRecordTtl(long) * @see UserRecordFailedException */ @Override - public ListenableFuture addUserRecord(String stream, String partitionKey, ByteBuffer data) { + public CompletableFuture addUserRecord(String stream, String partitionKey, ByteBuffer data) { return addUserRecord(stream, partitionKey, null, data); } /** - * Put a record asynchronously. A {@link ListenableFuture} is returned that + * Put a record asynchronously. A {@link CompletableFuture} is returned that * can be used to retrieve the result, either by polling or by registering a * callback. * @@ -430,7 +419,7 @@ public ListenableFuture addUserRecord(String stream, String pa * To add a listener to the future: *

* - * ListenableFuture<PutRecordResult> f = myKinesisProducer.addUserRecord(...); + * CompletableFuture<PutRecordResult> f = myKinesisProducer.addUserRecord(...); * com.google.common.util.concurrent.Futures.addCallback(f, callback, executor); * *

@@ -458,18 +447,18 @@ public ListenableFuture addUserRecord(String stream, String pa * if input does not meet stated constraints * @throws DaemonException * if the child process is dead - * @see ListenableFuture + * @see CompletableFuture * @see UserRecordResult * @see KinesisProducerConfiguration#setRecordTtl(long) * @see UserRecordFailedException */ @Override - public ListenableFuture addUserRecord(UserRecord userRecord) { + public CompletableFuture addUserRecord(UserRecord userRecord) { return addUserRecord(userRecord.getStreamName(), userRecord.getPartitionKey(), userRecord.getExplicitHashKey(), userRecord.getData(), userRecord.getSchema()); } /** - * Put a record asynchronously. A {@link ListenableFuture} is returned that + * Put a record asynchronously. A {@link CompletableFuture} is returned that * can be used to retrieve the result, either by polling or by registering a * callback. * @@ -487,7 +476,7 @@ public ListenableFuture addUserRecord(UserRecord userRecord) { * To add a listener to the future: *

* - * ListenableFuture<PutRecordResult> f = myKinesisProducer.addUserRecord(...); + * CompletableFuture<PutRecordResult> f = myKinesisProducer.addUserRecord(...); * com.google.common.util.concurrent.Futures.addCallback(f, callback, executor); * *

@@ -525,18 +514,18 @@ public ListenableFuture addUserRecord(UserRecord userRecord) { * if input does not meet stated constraints * @throws DaemonException * if the child process is dead - * @see ListenableFuture + * @see CompletableFuture * @see UserRecordResult * @see KinesisProducerConfiguration#setRecordTtl(long) * @see UserRecordFailedException */ @Override - public ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data) { + public CompletableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data) { return addUserRecord(stream, partitionKey, explicitHashKey, data, null); } @Override - public ListenableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema) { + public CompletableFuture addUserRecord(String stream, String partitionKey, String explicitHashKey, ByteBuffer data, Schema schema) { if (stream == null) { throw new IllegalArgumentException("Stream name cannot be null"); } @@ -600,7 +589,7 @@ public ListenableFuture addUserRecord(String stream, String pa } long id = messageNumber.getAndIncrement(); - SettableFuture f = SettableFuture.create(); + CompletableFuture f = new CompletableFuture<>(); FutureTask task = null; if(config.getUserRecordTimeoutInMillis() > 0) { task = new FutureTask(new FutureTimeoutRunnableTask(id), "TimedOut"); @@ -635,9 +624,9 @@ private class FutureTimeoutRunnableTask implements Runnable { public void run() { SettableFutureTracker futureTracker = getFutureTracker(id); totalFutureTimeouts.getAndIncrement(); - SettableFuture f = futureTracker.getFuture(); + CompletableFuture f = futureTracker.getFuture(); String message = "Message id " + id + " timeout out. Removing the submitted future from processing queue."; - f.setException(new FutureTimedOutException(message)); + f.completeExceptionally(new FutureTimedOutException(message)); log.error(message); } } @@ -725,7 +714,7 @@ public List getMetrics(String metricName, int windowSeconds) throws Inte } long id = messageNumber.getAndIncrement(); - SettableFuture> f = SettableFuture.create(); + CompletableFuture> f = new CompletableFuture<>(); FutureTask task = null; if(config.getUserRecordTimeoutInMillis() > 0) { task = new FutureTask(new FutureTimeoutRunnableTask(id), "TimedOut");