From 354143fdb93619e1c6424ede2e302b346c1f53b4 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Wed, 13 Sep 2023 15:15:55 +0200 Subject: [PATCH] introduce a future implementation tailored to SmallRye Fault Tolerance needs --- .../faulttolerance/core/Completer.java | 56 ++ .../smallrye/faulttolerance/core/Future.java | 160 +++++ .../faulttolerance/core/FutureImpl.java | 188 ++++++ .../faulttolerance/core/FutureStressTest.java | 57 ++ .../faulttolerance/core/FutureTest.java | 592 ++++++++++++++++++ .../faulttolerance/core/util/Action.java | 12 + 6 files changed, 1065 insertions(+) create mode 100644 implementation/core/src/main/java/io/smallrye/faulttolerance/core/Completer.java create mode 100644 implementation/core/src/main/java/io/smallrye/faulttolerance/core/Future.java create mode 100644 implementation/core/src/main/java/io/smallrye/faulttolerance/core/FutureImpl.java create mode 100644 implementation/core/src/test/java/io/smallrye/faulttolerance/core/FutureStressTest.java create mode 100644 implementation/core/src/test/java/io/smallrye/faulttolerance/core/FutureTest.java diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/Completer.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/Completer.java new file mode 100644 index 00000000..66331b68 --- /dev/null +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/Completer.java @@ -0,0 +1,56 @@ +package io.smallrye.faulttolerance.core; + +/** + * Creator and controller of a {@link Future}. An asynchronous computation that + * produces a future has to create a completer and return its future object. + * Afterwards, the completer is used to complete the future either with a value, + * using {@link #complete(Object) Completer.complete()}, or with an error, using + * {@link #completeWithError(Throwable) Completer.completeWithError()}. + *

+ * If the completer is supplied a cancellation callback using {@link #onCancel(Runnable)}, + * a successful cancellation request on the future calls the cancellation callback. + * + * @param type of the result of the computation + */ +public interface Completer { + /** + * Creates a new completer. + * + * @return a new completer; never {@code null} + * @param type of the result of the computation + */ + static Completer create() { + return new FutureImpl<>(); + } + + /** + * Completes the future with a value, if pending. + * If the future is already complete, does nothing. + * + * @param value the value with which the future is completed; may be {@code null} + */ + void complete(T value); + + /** + * Completes the future with an error, if pending. + * If the future is already complete, does nothing. + * + * @param error the error with which the future is completed; must not be {@code null} + */ + void completeWithError(Throwable error); + + /** + * Sets the cancellation callback. Note that this method may be called at most once; + * subsequent calls will result in an exception. + * + * @param cancellationCallback the cancellation callback; may not be {@code null} + */ + void onCancel(Runnable cancellationCallback); + + /** + * Returns the future created and controlled by this completer. + * + * @return the future; never {@code null} + */ + Future future(); +} diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/Future.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/Future.java new file mode 100644 index 00000000..b55dc966 --- /dev/null +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/Future.java @@ -0,0 +1,160 @@ +package io.smallrye.faulttolerance.core; + +import java.util.concurrent.Callable; +import java.util.function.BiConsumer; + +/** + * Represents a computation that may still be running, and allows obtaining + * a result of the computation once available. + *

+ * When the computation is still running, we say that the future is pending. + * Once the computation finishes, we say that the future is complete. + * Alternatively, after successful cancellation, we say the future is cancelled. + * The computation may finish with two kinds of outcomes: success, producing + * a value, and failure, producing an error. + *

+ * To obtain the result, consumers of the future are expected to register + * a completion callback using {@link #then(BiConsumer) Future.then()}. + * The callback is called with a pair [value, error] once the future completes. + * To distinguish a sucessful outcome from a failure, the error should be + * tested. If the error is {@code null}, the outcome is successful. Note that + * a success may still produce a {@code null} value. + *

+ * The callback may be registered before or after the future completes, + * and is guaranteed to be called exactly once. The thread on which the callback + * is called is not specified; it may be the thread that completes the future + * or the thread that registers the callback. Only one callback may be registered; + * attempts to register a second callback end up with an exception. + *

+ * Future objects are created by a {@link Completer}. The {@link Completer} is + * also the only way through which the future may be completed. For convenience, + * static factory methods are provided to construct already complete futures: + * {@link #of(Object) Future.of()}, {@link #ofError(Throwable) Future.ofError()}, + * and {@link #from(Callable) Future.from()}. + *

+ * A future consumer may request cancellation of the computation by calling + * {@link #cancel() Future.cancel()}. This is only possible while the future is + * pending; when the future is already complete, this is a noop. + *

+ * Unlike common {@code Future} abstractions, this one is fairly limited. + * There may only be one completion callback, and there are no combinators + * such as {@code map} or {@code flatMap}. + * + * @param type of the result of the computation + */ +public interface Future { + /** + * Returns a future that is already complete with given {@code value}. + * + * @param value the value; may be {@code null} + * @return the future that is already complete with the value; never {@code null} + * @param type of the value + */ + static Future of(T value) { + Completer completer = Completer.create(); + completer.complete(value); + return completer.future(); + } + + /** + * Returns a future that is already complete with given {@code error}. + * + * @param error the error; must not be {@code null} + * @return the future that is already complete with the error; never {@code null} + * @param type of hypothetical result; only for type inference + */ + static Future ofError(Throwable error) { + Completer completer = Completer.create(); + completer.completeWithError(error); + return completer.future(); + } + + /** + * Returns a future that is already complete with the outcome of given {@code callable} + * (which may be a returned value or a thrown error). + * + * @param callable the callable to call; must not be {@code null} + * @return the future that is complete with the outcome of the {@code callable}; never {@code null} + * @param type of the result of given {@code callable} + */ + static Future from(Callable callable) { + Completer completer = Completer.create(); + try { + T result = callable.call(); + completer.complete(result); + } catch (Exception e) { + completer.completeWithError(e); + } + return completer.future(); + } + + /** + * Registers a completion callback with this future. The first argument + * of the {@link BiConsumer} is the value of the future, the second argument + * is the error. + *

+ * Value may be {@code null} in case of a success, but error is never {@code null} + * in case of a failure. Therefore, idiomatic usage looks like: + * + *

+     * future.then((value, error) -> {
+     *     if (error == null) {
+     *         ... use value ...
+     *     } else {
+     *         ... use error ...
+     *     }
+     * });
+     * 
+ * + * @param callback the completion callback to be registered; must not be {@code null} + */ + void then(BiConsumer callback); + + /** + * Registers a completion callback with this future. The callback forwards + * the result of this future into the given completer. + * + * @param completer the completer to which the result of this future is forwarded; + * must not be {@code null} + */ + void thenComplete(Completer completer); + + /** + * Returns whether this future is complete. + * + * @return {@code true} if this future is complete, {@code false} otherwise + */ + boolean isComplete(); + + /** + * Returns whether this future is cancelled. + * + * @return {@code true} if this future is cancelled, {@code false} otherwise + */ + boolean isCancelled(); + + /** + * Blocks the calling thread until this future is complete or cancelled, + * and then returns the value of this future or throws the error, or throws + * {@link java.util.concurrent.CancellationException CancellationException}. + * In case this future is already complete or cancelled when this method + * is called, no blocking occurs. + *

+ * The blocked thread may be interrupted, in which case this method throws + * {@link InterruptedException}. + *

+ * This method should rarely be used without previous checking with {@link #isComplete()} + * or {@link #isCancelled()}. + * + * @return the value of this future; may be {@code null} + * @throws Throwable the error of this future, {@code CancellationException} or {@code InterruptedException} + */ + T awaitBlocking() throws Throwable; + + /** + * Requests cancellation of the computation represented by this future. + * + * @see Completer + */ + void cancel(); +} diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/FutureImpl.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/FutureImpl.java new file mode 100644 index 00000000..15c419c6 --- /dev/null +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/FutureImpl.java @@ -0,0 +1,188 @@ +package io.smallrye.faulttolerance.core; + +import static io.smallrye.faulttolerance.core.util.Preconditions.checkNotNull; + +import java.lang.invoke.ConstantBootstraps; +import java.lang.invoke.MethodHandles; +import java.lang.invoke.MethodHandles.Lookup; +import java.lang.invoke.VarHandle; +import java.util.concurrent.CancellationException; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; +import java.util.function.BiConsumer; + +final class FutureImpl implements Future, Completer { + // + // [ PENDING ] ---> [ COMPLETING ] ---> [ COMPLETE ] ---> [ DELIVERED ] + // | + // +-> [ CANCELLED ] + // + private static final int STATE_PENDING = 0; + private static final int STATE_COMPLETING = 1; + private static final int STATE_COMPLETE = 2; + private static final int STATE_DELIVERED = 3; + private static final int STATE_CANCELLED = 4; + + private static final Lookup LOOKUP = MethodHandles.lookup(); + private static final VarHandle STATE = ConstantBootstraps.fieldVarHandle(LOOKUP, + "state", VarHandle.class, FutureImpl.class, int.class); + private static final VarHandle COMPLETION_CALLBACK = ConstantBootstraps.fieldVarHandle(LOOKUP, + "completionCallback", VarHandle.class, FutureImpl.class, BiConsumer.class); + private static final VarHandle CANCELLATION_CALLBACK = ConstantBootstraps.fieldVarHandle(LOOKUP, + "cancellationCallback", VarHandle.class, FutureImpl.class, Runnable.class); + + private static final class ExceptionResult { + private final Throwable exception; + + private ExceptionResult(Throwable exception) { + this.exception = exception; + } + } + + private volatile int state = STATE_PENDING; + private volatile Object result; // value or `ExceptionResult` for error + + private volatile BiConsumer completionCallback = null; + private final Barrier completionBarrier = new Barrier(); + + private volatile Runnable cancellationCallback = null; + + FutureImpl() { + } + + @Override + public void complete(T value) { + if (STATE.compareAndSet(this, STATE_PENDING, STATE_COMPLETING)) { + this.result = value; + this.state = STATE_COMPLETE; + attemptDelivery(); + completionBarrier.open(); + } + } + + @Override + public void completeWithError(Throwable error) { + checkNotNull(error, "Error must be set"); + if (STATE.compareAndSet(this, STATE_PENDING, STATE_COMPLETING)) { + this.result = new ExceptionResult(error); + this.state = STATE_COMPLETE; + attemptDelivery(); + completionBarrier.open(); + } + } + + @Override + public void onCancel(Runnable cancellationCallback) { + checkNotNull(cancellationCallback, "Cancellation callback must be set"); + if (!CANCELLATION_CALLBACK.compareAndSet(this, null, cancellationCallback)) { + throw new IllegalStateException("Cancellation callback has already been set"); + } + } + + @Override + public Future future() { + return this; + } + + @Override + public void then(BiConsumer callback) { + checkNotNull(callback, "Completion callback must be set"); + if (COMPLETION_CALLBACK.compareAndSet(this, null, callback)) { + attemptDelivery(); + } else { + throw new IllegalStateException("Completion callback has already been set"); + } + } + + @Override + public void thenComplete(Completer completer) { + checkNotNull(completer, "Completer must be set"); + then((value, error) -> { + if (error == null) { + completer.complete(value); + } else { + completer.completeWithError(error); + } + }); + } + + @Override + public boolean isComplete() { + int state = this.state; + return state == STATE_COMPLETE || state == STATE_DELIVERED; + } + + @Override + public boolean isCancelled() { + return this.state == STATE_CANCELLED; + } + + @Override + public T awaitBlocking() throws Throwable { + try { + completionBarrier.await(); + } catch (InterruptedException e) { + // will throw `InterruptedException` below, if not complete or cancelled + } + + int state = this.state; + if (state == STATE_COMPLETE || state == STATE_DELIVERED) { + Object result = this.result; + if (result instanceof ExceptionResult) { + throw ((ExceptionResult) result).exception; + } + return (T) result; + } + if (state == STATE_CANCELLED) { + throw new CancellationException(); + } + + // not complete or cancelled, `completionBarrier.await()` above + // must have thrown `InterruptedException` + throw new InterruptedException(); + } + + @Override + public void cancel() { + if (STATE.compareAndSet(this, STATE_PENDING, STATE_CANCELLED)) { + Runnable cancellationCallback = this.cancellationCallback; + if (cancellationCallback != null) { + cancellationCallback.run(); + } + } + } + + private void attemptDelivery() { + BiConsumer callback = this.completionCallback; + if (callback != null && STATE.compareAndSet(this, STATE_COMPLETE, STATE_DELIVERED)) { + Object result = this.result; + if (result instanceof ExceptionResult) { + callback.accept(null, ((ExceptionResult) result).exception); + } else { + callback.accept((T) result, null); + } + } + } + + // adapted from `AbstractQueuedSynchronizer` + private static class Barrier extends AbstractQueuedSynchronizer { + void await() throws InterruptedException { + acquireSharedInterruptibly(1); + } + + void open() { + releaseShared(1); + } + + // --- + // implementation details + + protected int tryAcquireShared(int ignored) { + return getState() != 0 ? 1 : -1; + } + + protected boolean tryReleaseShared(int ignored) { + setState(1); + return true; + } + } +} diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/FutureStressTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/FutureStressTest.java new file mode 100644 index 00000000..55db5e0d --- /dev/null +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/FutureStressTest.java @@ -0,0 +1,57 @@ +package io.smallrye.faulttolerance.core; + +import static io.smallrye.faulttolerance.core.util.Action.startThread; +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.core.util.Action; +import io.smallrye.faulttolerance.core.util.barrier.Barrier; +import io.smallrye.faulttolerance.core.util.party.Party; + +public class FutureStressTest { + @Test + public void test() throws InterruptedException { + for (int i = 0; i < 20_000; i++) { + Completer completer = Completer.create(); + AtomicReference result = new AtomicReference<>(); + AtomicInteger callbackCounter = new AtomicInteger(); + + Party party = Party.create(2); + Barrier resultBarrier = Barrier.noninterruptible(); + + Action completerAction = () -> { + party.participant().attend(); + completer.complete("foobar"); + }; + Action callbackAction = () -> { + party.participant().attend(); + completer.future().then((value, error) -> { + result.set(value); + callbackCounter.incrementAndGet(); + resultBarrier.open(); + }); + }; + + // in practice, the thread that starts first has an advantage + // this makes sure that the advantage is evenly distributed + if (ThreadLocalRandom.current().nextBoolean()) { + startThread(callbackAction); + startThread(completerAction); + } else { + startThread(completerAction); + startThread(callbackAction); + } + + party.organizer().waitForAll(); + party.organizer().disband(); + resultBarrier.await(); + assertThat(result).hasValue("foobar"); + assertThat(callbackCounter).hasValue(1); + } + } +} diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/FutureTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/FutureTest.java new file mode 100644 index 00000000..cab0bf59 --- /dev/null +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/FutureTest.java @@ -0,0 +1,592 @@ +package io.smallrye.faulttolerance.core; + +import static io.smallrye.faulttolerance.core.util.Action.startThread; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatCode; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.core.util.TestException; +import io.smallrye.faulttolerance.core.util.barrier.Barrier; + +public class FutureTest { + @Test + public void singleThread_completeBeforeCallback_success() { + Completer completer = Completer.create(); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + + completer.complete("foobar"); + + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + }); + + assertThat(result).hasValue("foobar"); + assertThat(failure).hasValue(null); + } + + @Test + public void singleThread_completeBeforeCallback_failure() { + Completer completer = Completer.create(); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + + completer.completeWithError(new TestException()); + + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + }); + + assertThat(result).hasValue(null); + assertThat(failure).hasValueMatching(error -> error instanceof TestException); + } + + @Test + public void singleThread_completeAfterCallback_success() { + Completer completer = Completer.create(); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + }); + + completer.complete("foobar"); + + assertThat(result).hasValue("foobar"); + assertThat(failure).hasValue(null); + } + + @Test + public void singleThread_completeAfterCallback_failure() { + Completer completer = Completer.create(); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + }); + + completer.completeWithError(new TestException()); + + assertThat(result).hasValue(null); + assertThat(failure).hasValueMatching(error -> error instanceof TestException); + } + + @Test + public void singleThread_cancelBeforeCompletion_success() { + AtomicBoolean cancelled = new AtomicBoolean(); + Completer completer = Completer.create(); + completer.onCancel(() -> cancelled.set(true)); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + }); + completer.future().cancel(); + + completer.complete("foobar"); + + assertThat(result).hasValue(null); + assertThat(failure).hasValue(null); + assertThat(cancelled).isTrue(); + } + + @Test + public void singleThread_cancelBeforeCompletion_failure() { + AtomicBoolean cancelled = new AtomicBoolean(); + Completer completer = Completer.create(); + completer.onCancel(() -> cancelled.set(true)); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + }); + completer.future().cancel(); + + completer.completeWithError(new TestException()); + + assertThat(result).hasValue(null); + assertThat(failure).hasValue(null); + assertThat(cancelled).isTrue(); + } + + @Test + public void singleThread_cancelAfterCompletion_success() { + AtomicBoolean cancelled = new AtomicBoolean(); + Completer completer = Completer.create(); + completer.onCancel(() -> cancelled.set(true)); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + + completer.complete("foobar"); + + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + }); + completer.future().cancel(); + + assertThat(result).hasValue("foobar"); + assertThat(failure).hasValue(null); + assertThat(cancelled).isFalse(); + } + + @Test + public void singleThread_cancelAfterCompletion_failure() { + AtomicBoolean cancelled = new AtomicBoolean(); + Completer completer = Completer.create(); + completer.onCancel(() -> cancelled.set(true)); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + + completer.completeWithError(new TestException()); + + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + }); + completer.future().cancel(); + + assertThat(result).hasValue(null); + assertThat(failure).hasValueMatching(error -> error instanceof TestException); + assertThat(cancelled).isFalse(); + } + + @Test + public void singleThread_multipleCallbacks() { + Completer completer = Completer.create(); + + assertThatCode(() -> { + completer.future().then((value, error) -> { + }); + }).doesNotThrowAnyException(); + + assertThatCode(() -> { + completer.future().then((value, error) -> { + }); + }).isInstanceOf(IllegalStateException.class); + } + + // --- + + @Test + public void twoThreads_completeBeforeCallback_success() throws InterruptedException { + Completer completer = Completer.create(); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + AtomicReference deliveryThread = new AtomicReference<>(); + + Barrier deliveryBarrier = Barrier.noninterruptible(); + startThread(() -> { + completer.complete("foobar"); + deliveryBarrier.open(); + }); + + deliveryBarrier.await(); + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + deliveryThread.set(Thread.currentThread()); + }); + + assertThat(result).hasValue("foobar"); + assertThat(failure).hasValue(null); + assertThat(deliveryThread).hasValue(Thread.currentThread()); + } + + @Test + public void twoThreads_completeBeforeCallback_failure() throws InterruptedException { + Completer completer = Completer.create(); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + AtomicReference deliveryThread = new AtomicReference<>(); + + Barrier deliveryBarrier = Barrier.noninterruptible(); + startThread(() -> { + completer.completeWithError(new TestException()); + deliveryBarrier.open(); + }); + + deliveryBarrier.await(); + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + deliveryThread.set(Thread.currentThread()); + }); + + assertThat(result).hasValue(null); + assertThat(failure).hasValueMatching(error -> error instanceof TestException); + assertThat(deliveryThread).hasValue(Thread.currentThread()); + } + + @Test + public void twoThreads_completeAfterCallback_success() throws InterruptedException { + Completer completer = Completer.create(); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + AtomicReference deliveryThread = new AtomicReference<>(); + + Barrier completionBarrier = Barrier.noninterruptible(); + Barrier deliveryBarrier = Barrier.noninterruptible(); + startThread(() -> { + completionBarrier.await(); + completer.complete("foobar"); + deliveryBarrier.open(); + }); + + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + deliveryThread.set(Thread.currentThread()); + }); + completionBarrier.open(); + deliveryBarrier.await(); + + assertThat(result).hasValue("foobar"); + assertThat(failure).hasValue(null); + assertThat(deliveryThread).hasValueMatching(thread -> thread != Thread.currentThread()); + } + + @Test + public void twoThreads_completeAfterCallback_failure() throws InterruptedException { + Completer completer = Completer.create(); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + AtomicReference deliveryThread = new AtomicReference<>(); + + Barrier completionBarrier = Barrier.noninterruptible(); + Barrier deliveryBarrier = Barrier.noninterruptible(); + startThread(() -> { + completionBarrier.await(); + completer.completeWithError(new TestException()); + deliveryBarrier.open(); + }); + + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + deliveryThread.set(Thread.currentThread()); + }); + completionBarrier.open(); + deliveryBarrier.await(); + + assertThat(result).hasValue(null); + assertThat(failure).hasValueMatching(error -> error instanceof TestException); + assertThat(deliveryThread).hasValueMatching(thread -> thread != Thread.currentThread()); + } + + @Test + public void twoThreads_cancelBeforeCompletion_success() throws InterruptedException { + AtomicBoolean cancelled = new AtomicBoolean(); + Completer completer = Completer.create(); + completer.onCancel(() -> cancelled.set(true)); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + AtomicReference deliveryThread = new AtomicReference<>(); + + Barrier completionBarrier = Barrier.noninterruptible(); + Barrier deliveryBarrier = Barrier.noninterruptible(); + startThread(() -> { + completionBarrier.await(); + completer.complete("foobar"); + deliveryBarrier.open(); + }); + + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + deliveryThread.set(Thread.currentThread()); + }); + completer.future().cancel(); + completionBarrier.open(); + deliveryBarrier.await(); + + assertThat(result).hasValue(null); + assertThat(failure).hasValue(null); + assertThat(deliveryThread).hasValue(null); + assertThat(cancelled).isTrue(); + } + + @Test + public void twoThreads_cancelBeforeCompletion_failure() throws InterruptedException { + AtomicBoolean cancelled = new AtomicBoolean(); + Completer completer = Completer.create(); + completer.onCancel(() -> cancelled.set(true)); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + AtomicReference deliveryThread = new AtomicReference<>(); + + Barrier completionBarrier = Barrier.noninterruptible(); + Barrier deliveryBarrier = Barrier.noninterruptible(); + startThread(() -> { + completionBarrier.await(); + completer.completeWithError(new TestException()); + deliveryBarrier.open(); + }); + + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + deliveryThread.set(Thread.currentThread()); + }); + completer.future().cancel(); + completionBarrier.open(); + deliveryBarrier.await(); + + assertThat(result).hasValue(null); + assertThat(failure).hasValue(null); + assertThat(deliveryThread).hasValue(null); + assertThat(cancelled).isTrue(); + } + + @Test + public void twoThreads_cancelAfterCompletion_success() throws InterruptedException { + AtomicBoolean cancelled = new AtomicBoolean(); + Completer completer = Completer.create(); + completer.onCancel(() -> cancelled.set(true)); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + AtomicReference deliveryThread = new AtomicReference<>(); + + Barrier deliveryBarrier = Barrier.noninterruptible(); + startThread(() -> { + completer.complete("foobar"); + deliveryBarrier.open(); + }); + + deliveryBarrier.await(); + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + deliveryThread.set(Thread.currentThread()); + }); + completer.future().cancel(); + + assertThat(result).hasValue("foobar"); + assertThat(failure).hasValue(null); + assertThat(deliveryThread).hasValue(Thread.currentThread()); + assertThat(cancelled).isFalse(); + } + + @Test + public void twoThreads_cancelAfterCompletion_failure() throws InterruptedException { + AtomicBoolean cancelled = new AtomicBoolean(); + Completer completer = Completer.create(); + completer.onCancel(() -> cancelled.set(true)); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + AtomicReference deliveryThread = new AtomicReference<>(); + + Barrier deliveryBarrier = Barrier.noninterruptible(); + startThread(() -> { + completer.completeWithError(new TestException()); + deliveryBarrier.open(); + }); + + deliveryBarrier.await(); + completer.future().then((value, error) -> { + result.set(value); + failure.set(error); + deliveryThread.set(Thread.currentThread()); + }); + completer.future().cancel(); + + assertThat(result).hasValue(null); + assertThat(failure).hasValueMatching(error -> error instanceof TestException); + assertThat(deliveryThread).hasValue(Thread.currentThread()); + assertThat(cancelled).isFalse(); + } + + @Test + public void twoThreads_multipleCallbacks() throws InterruptedException { + Completer completer = Completer.create(); + + Barrier barrier = Barrier.noninterruptible(); + startThread(() -> { + completer.future().then((value, error) -> { + }); + barrier.open(); + }); + + barrier.await(); + assertThatCode(() -> { + completer.future().then((value, error) -> { + }); + }).isInstanceOf(IllegalStateException.class); + } + + // --- + // tests for the convenience helpers + + @Test + public void futureThenComplete_success() { + Completer completer1 = Completer.create(); + Completer completer2 = Completer.create(); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + + completer1.future().thenComplete(completer2); + + completer2.future().then((value, error) -> { + result.set(value); + failure.set(error); + }); + + completer1.complete("foobar"); + + assertThat(result).hasValue("foobar"); + assertThat(failure).hasValue(null); + } + + @Test + public void futureThenComplete_failure() { + Completer completer1 = Completer.create(); + Completer completer2 = Completer.create(); + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + + completer1.future().thenComplete(completer2); + + completer2.future().then((value, error) -> { + result.set(value); + failure.set(error); + }); + + completer1.completeWithError(new TestException()); + + assertThat(result).hasValue(null); + assertThat(failure).hasValueMatching(error -> error instanceof TestException); + } + + @Test + public void futureOfValue() { + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + Future future = Future.of("foobar"); + future.then((value, error) -> { + result.set(value); + failure.set(error); + }); + assertThat(result).hasValue("foobar"); + assertThat(failure).hasValue(null); + } + + @Test + public void futureOfError() { + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + Future future = Future.ofError(new TestException()); + future.then((value, error) -> { + result.set(value); + failure.set(error); + }); + assertThat(result).hasValue(null); + assertThat(failure).hasValueMatching(error -> error instanceof TestException); + } + + @Test + public void futureFromValue() { + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + Future future = Future.from(() -> "foobar"); + future.then((value, error) -> { + result.set(value); + failure.set(error); + }); + assertThat(result).hasValue("foobar"); + assertThat(failure).hasValue(null); + } + + @Test + public void futureFromError() { + AtomicReference result = new AtomicReference<>(); + AtomicReference failure = new AtomicReference<>(); + Future future = Future.from(() -> { + throw new TestException(); + }); + future.then((value, error) -> { + result.set(value); + failure.set(error); + }); + assertThat(result).hasValue(null); + assertThat(failure).hasValueMatching(error -> error instanceof TestException); + } + + // --- + // tests for the blocking part of the API + + @Test + public void isComplete_success() { + Completer completer = Completer.create(); + + assertThat(completer.future().isComplete()).isFalse(); + + completer.complete("foobar"); + + assertThat(completer.future().isComplete()).isTrue(); + } + + @Test + public void isComplete_failure() { + Completer completer = Completer.create(); + + assertThat(completer.future().isComplete()).isFalse(); + + completer.completeWithError(new TestException()); + + assertThat(completer.future().isComplete()).isTrue(); + } + + @Test + public void awaitBlocking_completeBeforeBlocking_success() throws Throwable { + Completer completer = Completer.create(); + completer.complete("foobar"); + + assertThat(completer.future().awaitBlocking()).isEqualTo("foobar"); + } + + @Test + public void awaitBlocking_completeBeforeBlocking_failure() { + Completer completer = Completer.create(); + completer.completeWithError(new TestException()); + + assertThatCode(completer.future()::awaitBlocking).isExactlyInstanceOf(TestException.class); + } + + @Test + public void awaitBlocking_completeAfterBlocking_success() throws Throwable { + Completer completer = Completer.create(); + + startThread(() -> { + Thread.sleep(500); + completer.complete("foobar"); + }); + + assertThat(completer.future().awaitBlocking()).isEqualTo("foobar"); + } + + @Test + public void awaitBlocking_completeAfterBlocking_failure() { + Completer completer = Completer.create(); + + startThread(() -> { + Thread.sleep(500); + completer.completeWithError(new TestException()); + }); + + assertThatCode(completer.future()::awaitBlocking).isExactlyInstanceOf(TestException.class); + } +} diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/util/Action.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/util/Action.java index 7a27ae94..3e9d9d0a 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/util/Action.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/util/Action.java @@ -1,5 +1,17 @@ package io.smallrye.faulttolerance.core.util; +import static io.smallrye.faulttolerance.core.util.SneakyThrow.sneakyThrow; + public interface Action { void run() throws Exception; + + static void startThread(Action action) { + new Thread(() -> { + try { + action.run(); + } catch (Exception e) { + sneakyThrow(e); + } + }).start(); + } }