Skip to content

Commit

Permalink
introduce a future implementation tailored to SmallRye Fault Toleranc…
Browse files Browse the repository at this point in the history
…e needs
  • Loading branch information
Ladicek committed Nov 15, 2024
1 parent 68c7e0b commit 354143f
Show file tree
Hide file tree
Showing 6 changed files with 1,065 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package io.smallrye.faulttolerance.core;

/**
* Creator and controller of a {@link Future}. An asynchronous computation that
* produces a future has to create a completer and return its future object.
* Afterwards, the completer is used to complete the future either with a value,
* using {@link #complete(Object) Completer.complete()}, or with an error, using
* {@link #completeWithError(Throwable) Completer.completeWithError()}.
* <p>
* If the completer is supplied a cancellation callback using {@link #onCancel(Runnable)},
* a successful cancellation request on the future calls the cancellation callback.
*
* @param <T> type of the result of the computation
*/
public interface Completer<T> {
/**
* Creates a new completer.
*
* @return a new completer; never {@code null}
* @param <T> type of the result of the computation
*/
static <T> Completer<T> create() {
return new FutureImpl<>();
}

/**
* Completes the future with a value, if pending.
* If the future is already complete, does nothing.
*
* @param value the value with which the future is completed; may be {@code null}
*/
void complete(T value);

/**
* Completes the future with an error, if pending.
* If the future is already complete, does nothing.
*
* @param error the error with which the future is completed; must not be {@code null}
*/
void completeWithError(Throwable error);

/**
* Sets the cancellation callback. Note that this method may be called at most once;
* subsequent calls will result in an exception.
*
* @param cancellationCallback the cancellation callback; may not be {@code null}
*/
void onCancel(Runnable cancellationCallback);

/**
* Returns the future created and controlled by this completer.
*
* @return the future; never {@code null}
*/
Future<T> future();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
package io.smallrye.faulttolerance.core;

import java.util.concurrent.Callable;
import java.util.function.BiConsumer;

/**
* Represents a computation that may still be running, and allows obtaining
* a result of the computation once available.
* <p>
* When the computation is still running, we say that the future is <em>pending</em>.
* Once the computation finishes, we say that the future is <em>complete</em>.
* Alternatively, after successful cancellation, we say the future is <em>cancelled</em>.
* The computation may finish with two kinds of outcomes: success, producing
* a <em>value</em>, and failure, producing an <em>error</em>.
* <p>
* To obtain the result, consumers of the future are expected to register
* a <em>completion callback</em> using {@link #then(BiConsumer) Future.then()}.
* The callback is called with a pair [value, error] once the future completes.
* To distinguish a sucessful outcome from a failure, the error should be
* tested. If the error is {@code null}, the outcome is successful. Note that
* a success may still produce a {@code null} value.
* <p>
* The callback may be registered before or after the future completes,
* and is guaranteed to be called exactly once. The thread on which the callback
* is called is not specified; it may be the thread that completes the future
* or the thread that registers the callback. Only one callback may be registered;
* attempts to register a second callback end up with an exception.
* <p>
* Future objects are created by a {@link Completer}. The {@link Completer} is
* also the only way through which the future may be completed. For convenience,
* static factory methods are provided to construct already complete futures:
* {@link #of(Object) Future.of()}, {@link #ofError(Throwable) Future.ofError()},
* and {@link #from(Callable) Future.from()}.
* <p>
* A future consumer may request cancellation of the computation by calling
* {@link #cancel() Future.cancel()}. This is only possible while the future is
* pending; when the future is already complete, this is a noop.
* <p>
* Unlike common {@code Future} abstractions, this one is fairly limited.
* There may only be one completion callback, and there are no combinators
* such as {@code map} or {@code flatMap}.
*
* @param <T> type of the result of the computation
*/
public interface Future<T> {
/**
* Returns a future that is already complete with given {@code value}.
*
* @param value the value; may be {@code null}
* @return the future that is already complete with the value; never {@code null}
* @param <T> type of the value
*/
static <T> Future<T> of(T value) {
Completer<T> completer = Completer.create();
completer.complete(value);
return completer.future();
}

/**
* Returns a future that is already complete with given {@code error}.
*
* @param error the error; must not be {@code null}
* @return the future that is already complete with the error; never {@code null}
* @param <T> type of hypothetical result; only for type inference
*/
static <T> Future<T> ofError(Throwable error) {
Completer<T> completer = Completer.create();
completer.completeWithError(error);
return completer.future();
}

/**
* Returns a future that is already complete with the outcome of given {@code callable}
* (which may be a returned value or a thrown error).
*
* @param callable the callable to call; must not be {@code null}
* @return the future that is complete with the outcome of the {@code callable}; never {@code null}
* @param <T> type of the result of given {@code callable}
*/
static <T> Future<T> from(Callable<T> callable) {
Completer<T> completer = Completer.create();
try {
T result = callable.call();
completer.complete(result);
} catch (Exception e) {
completer.completeWithError(e);
}
return completer.future();
}

/**
* Registers a completion callback with this future. The first argument
* of the {@link BiConsumer} is the value of the future, the second argument
* is the error.
* <p>
* Value may be {@code null} in case of a success, but error is never {@code null}
* in case of a failure. Therefore, idiomatic usage looks like:
*
* <pre>
* future.then((value, error) -&gt; {
* if (error == null) {
* ... use value ...
* } else {
* ... use error ...
* }
* });
* </pre>
*
* @param callback the completion callback to be registered; must not be {@code null}
*/
void then(BiConsumer<T, Throwable> callback);

/**
* Registers a completion callback with this future. The callback forwards
* the result of this future into the given completer.
*
* @param completer the completer to which the result of this future is forwarded;
* must not be {@code null}
*/
void thenComplete(Completer<T> completer);

/**
* Returns whether this future is complete.
*
* @return {@code true} if this future is complete, {@code false} otherwise
*/
boolean isComplete();

/**
* Returns whether this future is cancelled.
*
* @return {@code true} if this future is cancelled, {@code false} otherwise
*/
boolean isCancelled();

/**
* Blocks the calling thread until this future is complete or cancelled,
* and then returns the value of this future or throws the error, or throws
* {@link java.util.concurrent.CancellationException CancellationException}.
* In case this future is already complete or cancelled when this method
* is called, no blocking occurs.
* <p>
* The blocked thread may be interrupted, in which case this method throws
* {@link InterruptedException}.
* <p>
* This method should rarely be used without previous checking with {@link #isComplete()}
* or {@link #isCancelled()}.
*
* @return the value of this future; may be {@code null}
* @throws Throwable the error of this future, {@code CancellationException} or {@code InterruptedException}
*/
T awaitBlocking() throws Throwable;

/**
* Requests cancellation of the computation represented by this future.
*
* @see Completer
*/
void cancel();
}
Loading

0 comments on commit 354143f

Please sign in to comment.