Skip to content

Commit

Permalink
create new strategies that can be used for both sync and async invoca…
Browse files Browse the repository at this point in the history
…tions

This usually consists of dropping the old synchronous implementation and
renaming and adapting the old asynchronous implementation (`CompletionStage*`).
Some of the new implementations still contain extra code for handling
synchronous interrupts, mainly to keep all unit tests passing.

The pseudo-async invocations (methods returning `java.util.concurrent.Future`)
still have a few dedicated strategies (`FutureExecution`, `FutureTimeout`),
but those are not supported in the programmatic API, which will benefit most
from this unification.

This commit also renames `InvocationContext` to `FaultToleranceContext` and
`InvocationContextEvent` to `FaultToleranceEvent`.
  • Loading branch information
Ladicek committed Nov 18, 2024
1 parent 354143f commit f6049f7
Show file tree
Hide file tree
Showing 111 changed files with 3,760 additions and 4,322 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@
* for synchronous methods, {@code FaultTolerance<CompletionStage<Object>>} for asynchronous
* methods that return {@code CompletionStage}, and so on. Note that this effectively precludes
* defining a useful fallback, because fallback can only be defined when the value type is known.
* <p>
* Note that this annotation has the same differences to the standard MicroProfile Fault Tolerance
* as {@link FaultTolerance}:
* <ul>
* <li>asynchronous actions of type {@link java.util.concurrent.Future} are not supported;</li>
* <li>the fallback, circuit breaker and retry strategies always inspect the cause chain of exceptions,
* following the behavior of SmallRye Fault Tolerance in the non-compatible mode.</li>
* </ul>
*/
@Inherited
@Documented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
Expand Down Expand Up @@ -53,7 +52,7 @@
* Tolerance and SmallRye Fault Tolerance. It shares the set of fault tolerance strategies, their invocation order
* and behavior, their configuration properties, etc. Notable differences are:
* <ul>
* <li>asynchronous actions of type {@link Future} are not supported;</li>
* <li>asynchronous actions of type {@link java.util.concurrent.Future} are not supported;</li>
* <li>the fallback, circuit breaker and retry strategies always inspect the cause chain of exceptions,
* following the behavior of SmallRye Fault Tolerance in the non-compatible mode.</li>
* </ul>
Expand Down Expand Up @@ -268,7 +267,7 @@ default Runnable adaptRunnable(Runnable action) {
interface Builder<T, R> {
/**
* Assigns a description to the resulting set of configured fault tolerance strategies. The description
* is used in logging messages and exception messages, and also as an identifier for metrics .
* is used in logging messages and exception messages, and also as an identifier for metrics.
* <p>
* The description may be an arbitrary string. Duplicates are permitted.
* <p>
Expand Down
48 changes: 34 additions & 14 deletions doc/modules/ROOT/pages/internals/core.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,37 @@ It is an interface that looks like this:
[source,java]
----
interface FaultToleranceStrategy<V> {
V apply(InvocationContext<V> ctx) throws Exception;
Future<V> apply(FaultToleranceContext<V> ctx);
}
----

The `InvocationContext` is a `Callable` that represents the method invocation guarded by this fault tolerance strategy.
The fault tolerance strategy does its work around `ctx.call()`.
It can catch exceptions, invoke `ctx.call()` multiple times, invoke something else, etc.
NOTE: The `Future` type here is _not_ a `java.util.concurrent.Future`.
It comes from {smallrye-fault-tolerance} and it could be described as a very bare-bones variant of `CompletableFuture`, except it's split into `Completer` and `Future`.
It is not supposed to be used outside of this project.

The `FaultToleranceContext` is a `Supplier<Future<V>>` that represents the method invocation guarded by this fault tolerance strategy.
The fault tolerance strategy does its work around `ctx.get()`.
It can catch exceptions, invoke `ctx.get()` multiple times, invoke something else, etc.
As an example, let's consider this strategy, applicable to methods that return a `String`:

[source,java]
----
public class MyStringFallback implements FaultToleranceStrategy<String> {
@Override
public String apply(InvocationContext<String> ctx) {
public Future<String> apply(FaultToleranceContext<String> ctx) {
Completer<String> completer = Completer.create();
try {
return ctx.call();
ctx.get().then((value, error) -> {
if (error == null) {
completer.complete(value);
} else {
completer.complete("my string value");
}
});
} catch (Exception ignored) {
return "my string value";
completer.complete("my string value");
}
return completer.future();
}
}
----
Expand All @@ -49,21 +61,29 @@ public class MyStringFallback implements FaultToleranceStrategy<String> {
}
@Override
public String apply(InvocationContext<String> ctx) {
public Future<String> apply(FaultToleranceContext<String> ctx) {
Completer<String> completer = Completer.create();
try {
return delegate.apply(ctx);
delegate.apply(ctx).then((value, error) -> {
if (error == null) {
completer.complete(value);
} else {
completer.complete("my string value");
}
});
} catch (Exception ignored) {
return "my string value";
completer.complete("my string value");
}
return completer.future();
}
}
----

We see that one strategy delegates to another, passing the `InvocationContext` along.
We see that one strategy delegates to another, passing the `FaultToleranceContext` along.
In fact, all the implementations in {smallrye-fault-tolerance} are written like this: they expect to be used in a chain, so they take another `FaultToleranceStrategy` to which they delegate.
But if all strategies have this form, when is `ctx.call()` actually invoked?
But if all strategies have this form, when is `ctx.get()` actually invoked?
Good question!
The ultimate `ctx.call()` invocation is done by a special fault tolerance strategy which is called, well, `Invocation`.
The ultimate `ctx.get()` invocation is done by a special fault tolerance strategy which is called, well, `Invocation`.

As an example which uses real {microprofile-fault-tolerance} annotations, let's consider this method:

Expand All @@ -85,7 +105,7 @@ Fallback(
Retry(
Timeout(
Invocation(
// ctx.call() will happen here
// ctx.get() will happen here
// that will, in turn, invoke doSomething()
)
)
Expand Down
8 changes: 8 additions & 0 deletions doc/modules/ROOT/pages/internals/project-structure.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@
Only required during {smallrye-fault-tolerance} build.
* `implementation/context-propagation`: Optional integration with MicroProfile Context Propagation.
See xref:integration/context-propagation.adoc[Context Propagation integration].
* `implementation/kotlin`: Optional integration with Kotlin.
See xref:integration/kotlin.adoc[Kotlin Integration Concerns].
* `implementation/mutiny`: Optional integration with Mutiny.
See xref:integration/async-types.adoc[Additional Asynchronous Types Integration Concerns].
* `implementation/rxjava3`: Optional integration with RxJava 3.
See xref:integration/async-types.adoc[Additional Asynchronous Types Integration Concerns].
* `implementation/standalone`: Standalone implementation of the {smallrye-fault-tolerance} programmatic API.
See xref:integration/programmatic-api.adoc[Programmatic API Integration Concerns].
* `implementation/tracing-propagation`: Optional integration between MicroProfile Context Propagation and OpenTracing.
See xref:integration/opentracing.adoc[OpenTracing integration].
* `implementation/vertx`: Optional integration of the Vert.x event loop.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@

public final class FailureContext {
public final Throwable failure;
public final InvocationContext<?> invocationContext;
public final FaultToleranceContext<?> context;

public FailureContext(Throwable failure, InvocationContext<?> invocationContext) {
public FailureContext(Throwable failure, FaultToleranceContext<?> context) {
this.failure = Objects.requireNonNull(failure);
this.invocationContext = Objects.requireNonNull(invocationContext);
this.context = Objects.requireNonNull(context);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.smallrye.faulttolerance.core;

import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import java.util.function.Supplier;

public final class FaultToleranceContext<V> implements Supplier<Future<V>> {
private final Supplier<Future<V>> delegate;
private final boolean isAsync;

public FaultToleranceContext(Supplier<Future<V>> delegate, boolean isAsync) {
this.delegate = delegate;
this.isAsync = isAsync;
}

@Override
public Future<V> get() {
return delegate.get();
}

/**
* Whether the guarded operation is truly asynchronous (that is, returns
* a {@code CompletionStage} of the result, or some other asynchronous type).
*/
public boolean isAsync() {
return isAsync;
}

/**
* Whether the guarded operation is synchronous. This includes pseudo-asynchronous
* operations (that return a {@code Future} of the result).
*/
public boolean isSync() {
return !isAsync;
}

// arbitrary contextual data

private final ConcurrentMap<Class<?>, Object> data = new ConcurrentHashMap<>(4);

public <T> void set(Class<T> clazz, T object) {
data.put(clazz, object);
}

public <T> T remove(Class<T> clazz) {
return clazz.cast(data.remove(clazz));
}

public <T> T get(Class<T> clazz) {
return clazz.cast(data.get(clazz));
}

public <T> T get(Class<T> clazz, T defaultValue) {
T value = get(clazz);
return value != null ? value : defaultValue;
}

// out-of-band communication between fault tolerance strategies in a single chain

private final ConcurrentMap<Class<? extends FaultToleranceEvent>, Collection<Consumer<? extends FaultToleranceEvent>>> eventHandlers = new ConcurrentHashMap<>();

public <E extends FaultToleranceEvent> void registerEventHandler(Class<E> eventType, Consumer<E> handler) {
eventHandlers.computeIfAbsent(eventType, ignored -> new ConcurrentLinkedQueue<>()).add(handler);
}

public <E extends FaultToleranceEvent> void fireEvent(E event) {
Collection<Consumer<? extends FaultToleranceEvent>> handlers = eventHandlers.get(event.getClass());
if (handlers != null) {
for (Consumer<? extends FaultToleranceEvent> handler : handlers) {
@SuppressWarnings("unchecked")
Consumer<E> consumer = (Consumer<E>) handler;
consumer.accept(event);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.smallrye.faulttolerance.core;

public interface InvocationContextEvent {
public interface FaultToleranceEvent {
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
* just pass it down the chain.
* Usually, the last strategy will be {@link Invocation}.
* <p>
* The {@code Callable}s are wrapped in an {@link InvocationContext}, which also provides support for out-of-band
* The {@code Callable}s are wrapped in an {@link FaultToleranceContext}, which also provides support for out-of-band
* communication between fault tolerance strategies in a single chain.
* <p>
* The strategies must be thread-safe, as they are expected to be used simultaneously from multiple threads.
Expand All @@ -22,11 +22,10 @@
public interface FaultToleranceStrategy<V> {
/**
* Apply the fault tolerance strategy around the target {@link Callable}.
* The {@code Callable} is wrapped in an {@link InvocationContext}.
* The {@code Callable} is wrapped in an {@link FaultToleranceContext}.
*
* @param ctx the {@code InvocationContext} wrapping the {@code Callable} guarded by this fault tolerance strategy
* @return result computed by the target {@code Callable}
* @throws Exception if result couldn't be computed
*/
V apply(InvocationContext<V> ctx) throws Exception;
Future<V> apply(FaultToleranceContext<V> ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

/**
* A "sentinel" fault tolerance strategy that does no processing, it only invokes the guarded {@link Callable}.
* This is supposed to be used as the last fault tolerance stragegy in a chain.
* This is supposed to be used as the last fault tolerance strategy in a chain.
* <p>
* There's only one instance of this class, accessible using {@link #invocation()}.
*/
Expand All @@ -23,10 +23,12 @@ private Invocation() {
}

@Override
public V apply(InvocationContext<V> ctx) throws Exception {
public Future<V> apply(FaultToleranceContext<V> ctx) {
LOG.trace("Guarded method invocation started");
try {
return ctx.call();
return ctx.get();
} catch (Exception e) {
return Future.ofError(e);
} finally {
LOG.trace("Guarded method invocation finished");
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import java.util.function.Consumer;

import io.smallrye.faulttolerance.api.CircuitBreakerState;
import io.smallrye.faulttolerance.core.InvocationContext;
import io.smallrye.faulttolerance.core.FaultToleranceContext;
import io.smallrye.faulttolerance.core.bulkhead.BulkheadEvents;
import io.smallrye.faulttolerance.core.circuit.breaker.CircuitBreakerEvents;
import io.smallrye.faulttolerance.core.rate.limit.RateLimitEvents;
Expand Down Expand Up @@ -55,7 +55,7 @@ final class EventHandlers {
this.timeoutOnFinished = Callbacks.wrap(timeoutOnFinished);
}

void register(InvocationContext<?> ctx) {
void register(FaultToleranceContext<?> ctx) {
if (bulkheadOnAccepted != null || bulkheadOnRejected != null) {
ctx.registerEventHandler(BulkheadEvents.DecisionMade.class, event -> {
if (event.accepted) {
Expand Down
Loading

0 comments on commit f6049f7

Please sign in to comment.