From 2d55dd0768d3c6e5659b39ccb92b05abeb11d4f5 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Wed, 11 Dec 2024 17:21:51 +0100 Subject: [PATCH 1/3] fix timeout on event loop This commit mainly improves the `Timeout` strategy implementation to fail the invocation on timeout on the original thread, if it was remembered previously (that is, in case of an event loop). As a side effect, this commit also makes the `TimeoutExecution` class a little smaller. --- .../faulttolerance/core/timeout/Timeout.java | 31 ++++++++----------- .../core/timeout/TimeoutExecution.java | 24 +++----------- .../core/timeout/TestTimer.java | 6 ++-- .../core/timeout/TimeoutExecutionTest.java | 12 +++---- 4 files changed, 26 insertions(+), 47 deletions(-) diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/Timeout.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/Timeout.java index 50a26b12a..aa5b8bf89 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/Timeout.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/Timeout.java @@ -5,7 +5,7 @@ import static io.smallrye.faulttolerance.core.util.Preconditions.check; import static io.smallrye.faulttolerance.core.util.Preconditions.checkNotNull; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Executor; import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException; @@ -39,26 +39,21 @@ public Future apply(FaultToleranceContext ctx) { ctx.fireEvent(TimeoutEvents.Started.INSTANCE); // must extract `FutureTimeoutNotification` early, because if retries are present, - // a different `FutureTimeoutNotification` may be present in the `InvocationContext` + // a different `FutureTimeoutNotification` may be present in the `FaultToleranceContext` // by the time the timeout callback is invoked FutureTimeoutNotification notification = ctx.remove(FutureTimeoutNotification.class); - AtomicBoolean completedWithTimeout = new AtomicBoolean(false); - Runnable onTimeout = () -> { - if (completedWithTimeout.compareAndSet(false, true)) { - LOG.debugf("%s invocation timed out (%d ms)", description, timeoutInMillis); - ctx.fireEvent(TimeoutEvents.Finished.TIMED_OUT); - TimeoutException timeout = new TimeoutException(description + " timed out"); - if (notification != null) { - notification.accept(timeout); - } - result.completeWithError(timeout); - } - }; - Thread executingThread = ctx.isSync() ? Thread.currentThread() : null; - TimeoutExecution execution = new TimeoutExecution(executingThread, timeoutInMillis, onTimeout); - TimerTask task = timer.schedule(execution.timeoutInMillis(), execution::timeoutAndInterrupt); + TimeoutExecution execution = new TimeoutExecution(executingThread, () -> { + LOG.debugf("%s invocation timed out (%d ms)", description, timeoutInMillis); + ctx.fireEvent(TimeoutEvents.Finished.TIMED_OUT); + TimeoutException timeout = new TimeoutException(description + " timed out"); + if (notification != null) { + notification.accept(timeout); + } + result.completeWithError(timeout); + }); + TimerTask task = timer.schedule(timeoutInMillis, execution::timeoutAndInterrupt, ctx.get(Executor.class)); Future originalResult; try { @@ -81,7 +76,7 @@ public Future apply(FaultToleranceContext ctx) { } if (execution.hasTimedOut()) { - onTimeout.run(); + // the "on timeout" callback is called by `execution::timeoutAndInterrupt` above } else if (error == null) { ctx.fireEvent(TimeoutEvents.Finished.NORMALLY); result.complete(value); diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutExecution.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutExecution.java index d01887dd0..a37504424 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutExecution.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/timeout/TimeoutExecution.java @@ -1,5 +1,6 @@ package io.smallrye.faulttolerance.core.timeout; +import java.lang.invoke.ConstantBootstraps; import java.lang.invoke.MethodHandles; import java.lang.invoke.VarHandle; @@ -8,14 +9,8 @@ final class TimeoutExecution { private static final int STATE_FINISHED = 1; private static final int STATE_TIMED_OUT = 2; - private static final VarHandle STATE; - static { - try { - STATE = MethodHandles.lookup().findVarHandle(TimeoutExecution.class, "state", int.class); - } catch (ReflectiveOperationException e) { - throw new ExceptionInInitializerError(e); - } - } + private static final VarHandle STATE = ConstantBootstraps.fieldVarHandle(MethodHandles.lookup(), + "state", VarHandle.class, TimeoutExecution.class, int.class); private volatile int state; @@ -24,23 +19,12 @@ final class TimeoutExecution { // can be null, if no action shall be performed upon timeout private final Runnable timeoutAction; - private final long timeoutInMillis; - - TimeoutExecution(Thread executingThread, long timeoutInMillis) { - this(executingThread, timeoutInMillis, null); - } - - TimeoutExecution(Thread executingThread, long timeoutInMillis, Runnable timeoutAction) { + TimeoutExecution(Thread executingThread, Runnable timeoutAction) { this.state = STATE_RUNNING; this.executingThread = executingThread; - this.timeoutInMillis = timeoutInMillis; this.timeoutAction = timeoutAction; } - long timeoutInMillis() { - return timeoutInMillis; - } - boolean isRunning() { return state == STATE_RUNNING; } diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimer.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimer.java index 1a7a14c4d..cab9fd580 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimer.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TestTimer.java @@ -70,13 +70,13 @@ public boolean cancel() { @Override public TimerTask schedule(long delayInMillis, Runnable task, Executor executor) { - // not used in `Timeout` / `CompletionStageTimeout` - throw new UnsupportedOperationException(); + // in the test, the `executor` is always `null` + return schedule(delayInMillis, task); } @Override public int countScheduledTasks() { - // not used in `Timeout` / `CompletionStageTimeout` + // not used in `Timeout` throw new UnsupportedOperationException(); } diff --git a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimeoutExecutionTest.java b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimeoutExecutionTest.java index 2370c9046..411a7de7e 100644 --- a/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimeoutExecutionTest.java +++ b/implementation/core/src/test/java/io/smallrye/faulttolerance/core/timeout/TimeoutExecutionTest.java @@ -9,10 +9,12 @@ public class TimeoutExecutionTest { private TimeoutExecution execution; + private AtomicBoolean timedOut; @BeforeEach public void setUp() { - execution = new TimeoutExecution(Thread.currentThread(), 1000L); + execution = new TimeoutExecution(Thread.currentThread(), () -> timedOut.set(true)); + timedOut = new AtomicBoolean(false); } @Test @@ -20,11 +22,6 @@ public void initialState() { assertThat(execution.isRunning()).isTrue(); } - @Test - public void timeoutValue() { - assertThat(execution.timeoutInMillis()).isEqualTo(1000L); - } - @Test public void finish() { AtomicBoolean flag = new AtomicBoolean(false); @@ -38,6 +35,7 @@ public void timeout() { execution.timeoutAndInterrupt(); assertThat(execution.hasTimedOut()).isTrue(); assertThat(Thread.interrupted()).isTrue(); // clear the current thread interruption status + assertThat(timedOut).isTrue(); } @Test @@ -49,6 +47,7 @@ public void timeoutAfterFinish() { assertThat(execution.hasTimedOut()).isFalse(); assertThat(flag).isTrue(); assertThat(Thread.currentThread().isInterrupted()).isFalse(); + assertThat(timedOut).isFalse(); } @Test @@ -60,5 +59,6 @@ public void finishAfterTimeout() { assertThat(execution.hasTimedOut()).isTrue(); assertThat(flag).isFalse(); assertThat(Thread.interrupted()).isTrue(); // clear the current thread interruption status + assertThat(timedOut).isTrue(); } } From 1fe4c4ab1c8768f3a9584b8b682c477b65b85764 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Thu, 12 Dec 2024 15:42:30 +0100 Subject: [PATCH 2/3] improve Vert.x event loop integration The `EventLoop` interface was simplified: the `isEventLoopThread()` method was removed and instead, the `executor()` method is supposed to return `null` if the current thread is not an event loop thread. Further, the `VertxEventLoop` implementation considers all Vert.x threads "event loops", even worker threads. Finally, the `VertxExecutor` and `ThreadOffload` classes make bigger effort to keep the execution on the original Vert.x context as remembered at the beginning by `RememberEventLoop`. --- .../core/FaultToleranceContext.java | 4 +++ .../core/async/RememberEventLoop.java | 5 +-- .../core/async/ThreadOffload.java | 35 ++++++++++++++----- .../core/event/loop/EventLoop.java | 9 +---- .../core/event/loop/NoEventLoop.java | 7 +--- .../faulttolerance/vertx/VertxEventLoop.java | 18 +++------- .../faulttolerance/vertx/VertxExecutor.java | 25 ++++++------- 7 files changed, 54 insertions(+), 49 deletions(-) diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/FaultToleranceContext.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/FaultToleranceContext.java index 1f04cabec..f1a63316b 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/FaultToleranceContext.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/FaultToleranceContext.java @@ -48,6 +48,10 @@ public T remove(Class clazz) { return clazz.cast(data.remove(clazz)); } + public boolean has(Class clazz) { + return data.containsKey(clazz); + } + public T get(Class clazz) { return clazz.cast(data.get(clazz)); } diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/async/RememberEventLoop.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/async/RememberEventLoop.java index 4ee6419b0..d47dff60b 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/async/RememberEventLoop.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/async/RememberEventLoop.java @@ -29,8 +29,9 @@ public Future apply(FaultToleranceContext ctx) { LOG.trace("RememberEventLoopExecutor started"); try { - if (eventLoop.isEventLoopThread()) { - ctx.set(Executor.class, eventLoop.executor()); + Executor executor = eventLoop.executor(); + if (executor != null) { + ctx.set(Executor.class, executor); } return delegate.apply(ctx); diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/async/ThreadOffload.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/async/ThreadOffload.java index 51d3c7d45..8c7ee2abd 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/async/ThreadOffload.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/async/ThreadOffload.java @@ -28,7 +28,8 @@ public ThreadOffload(FaultToleranceStrategy delegate, Executor executor, bool @Override public Future apply(FaultToleranceContext ctx) { // required for `@ApplyGuard` - if (!ctx.get(ThreadOffloadEnabled.class, defaultEnabled).value) { + boolean hasRememberedExecutor = ctx.has(Executor.class); + if (!hasRememberedExecutor && !ctx.get(ThreadOffloadEnabled.class, defaultEnabled).value) { return delegate.apply(ctx); } @@ -37,13 +38,31 @@ public Future apply(FaultToleranceContext ctx) { Executor executor = ctx.get(Executor.class, this.executor); Completer result = Completer.create(); - executor.execute(() -> { - try { - delegate.apply(ctx).thenComplete(result); - } catch (Exception e) { - result.completeWithError(e); - } - }); + if (hasRememberedExecutor) { + executor.execute(() -> { + try { + delegate.apply(ctx).then((value, error) -> { + executor.execute(() -> { + if (error == null) { + result.complete(value); + } else { + result.completeWithError(error); + } + }); + }); + } catch (Exception e) { + result.completeWithError(e); + } + }); + } else { + executor.execute(() -> { + try { + delegate.apply(ctx).thenComplete(result); + } catch (Exception e) { + result.completeWithError(e); + } + }); + } return result.future(); } finally { LOG.trace("ThreadOffload finished"); diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/event/loop/EventLoop.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/event/loop/EventLoop.java index 8b77a08cd..545e2cd06 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/event/loop/EventLoop.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/event/loop/EventLoop.java @@ -9,16 +9,9 @@ * Discovered using {@link ServiceLoader}. At most one implementation may be present on the classpath. */ public interface EventLoop { - /** - * Returns whether current thread is an event loop thread. - *

- * When this method returns {@code false}, calling {@link #executor()} - * doesn't make sense and throws {@link UnsupportedOperationException}. - */ - boolean isEventLoopThread(); - /** * Returns an {@link Executor} that runs tasks on the current thread's event loop. + * If the current thread is not an event loop thread, returns {@code null}. *

* Pay attention to when you call this method. If you want to later use an executor * for current thread's event loop, possibly even from a different thread, call this method diff --git a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/event/loop/NoEventLoop.java b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/event/loop/NoEventLoop.java index c238d57be..27e053481 100644 --- a/implementation/core/src/main/java/io/smallrye/faulttolerance/core/event/loop/NoEventLoop.java +++ b/implementation/core/src/main/java/io/smallrye/faulttolerance/core/event/loop/NoEventLoop.java @@ -9,13 +9,8 @@ private NoEventLoop() { // avoid instantiation } - @Override - public boolean isEventLoopThread() { - return false; - } - @Override public Executor executor() { - throw new UnsupportedOperationException(); + return null; } } diff --git a/implementation/vertx/src/main/java/io/smallrye/faulttolerance/vertx/VertxEventLoop.java b/implementation/vertx/src/main/java/io/smallrye/faulttolerance/vertx/VertxEventLoop.java index 72109372a..6cd5b3d4e 100644 --- a/implementation/vertx/src/main/java/io/smallrye/faulttolerance/vertx/VertxEventLoop.java +++ b/implementation/vertx/src/main/java/io/smallrye/faulttolerance/vertx/VertxEventLoop.java @@ -7,20 +7,12 @@ import io.vertx.core.Vertx; public final class VertxEventLoop implements EventLoop { - @Override - public boolean isEventLoopThread() { - return Context.isOnEventLoopThread(); - } - - private void checkEventLoopThread() { - if (!isEventLoopThread()) { - throw new UnsupportedOperationException(); - } - } - @Override public Executor executor() { - checkEventLoopThread(); - return new VertxExecutor(Vertx.currentContext()); + if (Context.isOnVertxThread()) { + // all Vert.x threads are "event loops", even worker threads + return new VertxExecutor(Vertx.currentContext(), Context.isOnWorkerThread()); + } + return null; } } diff --git a/implementation/vertx/src/main/java/io/smallrye/faulttolerance/vertx/VertxExecutor.java b/implementation/vertx/src/main/java/io/smallrye/faulttolerance/vertx/VertxExecutor.java index 558a11215..bbb7c8c2b 100644 --- a/implementation/vertx/src/main/java/io/smallrye/faulttolerance/vertx/VertxExecutor.java +++ b/implementation/vertx/src/main/java/io/smallrye/faulttolerance/vertx/VertxExecutor.java @@ -4,28 +4,29 @@ import io.smallrye.faulttolerance.core.util.RunnableWrapper; import io.vertx.core.Context; -import io.vertx.core.Vertx; final class VertxExecutor implements Executor { private final Context vertxContext; + private final boolean offloadToWorkerThread; - VertxExecutor(Context vertxContext) { + VertxExecutor(Context vertxContext, boolean offloadToWorkerThread) { this.vertxContext = vertxContext; + this.offloadToWorkerThread = offloadToWorkerThread; } @Override public void execute(Runnable runnable) { - // fast path: if we're on the correct event loop thread already, - // we can run the task directly - if (Vertx.currentContext() == vertxContext) { - runnable.run(); - return; - } - Runnable wrappedRunnable = RunnableWrapper.INSTANCE.wrap(runnable); - vertxContext.runOnContext(ignored -> { - wrappedRunnable.run(); - }); + if (vertxContext.isEventLoopContext() && offloadToWorkerThread) { + vertxContext.executeBlocking(() -> { + wrappedRunnable.run(); + return null; + }); + } else { + vertxContext.runOnContext(ignored -> { + wrappedRunnable.run(); + }); + } } } From 8dadb5d8bde0973de5e4175d30e3b2c20c4f57dc Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Thu, 12 Dec 2024 17:15:15 +0100 Subject: [PATCH 3/3] improve Vert.x tests The improvements mainly consist of: - running each service call on a separate duplicated context; - running each test on an event loop thread and on a worker thread. This better simulates what happens in practice (at least in Quarkus). Further, some new tests are added: - timeouts with Vert.x; - asynchronous execution with Vert.x, where the future is completed on a different kind of thread than the original. --- .../vertx/ContextDescription.java | 47 ++++++++ .../faulttolerance/vertx/ExecutionStyle.java | 7 ++ .../faulttolerance/vertx/VertxContext.java | 71 ++++++++++++ .../vertx/async/AsyncOnVertxThreadTest.java | 64 +++++++++++ .../faulttolerance/vertx/async/MyService.java | 43 ++++++++ .../AsyncBulkheadOnVertxThreadTest.java | 44 +++++--- .../vertx/bulkhead/MyService.java | 15 +-- .../AsyncBulkheadRetryOnVertxThreadTest.java | 45 +++++--- .../vertx/bulkhead/retry/MyService.java | 11 +- .../retry/AsyncRetryOnVertxThreadTest.java | 53 ++++++--- .../faulttolerance/vertx/retry/MyService.java | 16 +-- .../AsyncRetryFallbackOnVertxThreadTest.java | 52 ++++++--- .../vertx/retry/fallback/MyService.java | 16 +-- ...ryWithRequestContextOnVertxThreadTest.java | 51 ++++++--- .../vertx/retry/requestcontext/MyService.java | 10 +- .../AsyncTimeoutOnVertxThreadTest.java | 103 ++++++++++++++++++ .../vertx/timeout/MyService.java | 32 ++++++ 17 files changed, 576 insertions(+), 104 deletions(-) create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ContextDescription.java create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ExecutionStyle.java create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/VertxContext.java create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/AsyncOnVertxThreadTest.java create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/MyService.java create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/AsyncTimeoutOnVertxThreadTest.java create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/MyService.java diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ContextDescription.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ContextDescription.java new file mode 100644 index 000000000..8ff772cae --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ContextDescription.java @@ -0,0 +1,47 @@ +package io.smallrye.faulttolerance.vertx; + +import java.util.Locale; +import java.util.Objects; + +public final class ContextDescription { + public final ExecutionStyle executionStyle; + public final String contextClass; + public final String uuid; + public final String contextHash; + + ContextDescription(ExecutionStyle executionStyle, String contextClass, String uuid, String contextHash) { + this.executionStyle = executionStyle; + this.contextClass = contextClass; + this.contextHash = contextHash; + this.uuid = uuid; + } + + public boolean isDuplicatedContext() { + return "DuplicatedContext".equals(contextClass); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ContextDescription)) { + return false; + } + ContextDescription that = (ContextDescription) o; + return Objects.equals(executionStyle, that.executionStyle) + && Objects.equals(contextClass, that.contextClass) + && Objects.equals(uuid, that.uuid) + && Objects.equals(contextHash, that.contextHash); + } + + @Override + public int hashCode() { + return Objects.hash(executionStyle, contextClass, uuid, contextHash); + } + + @Override + public String toString() { + return executionStyle.toString().toLowerCase(Locale.ROOT) + + "|" + contextClass + + "|" + uuid + + "|" + contextHash; + } +} diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ExecutionStyle.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ExecutionStyle.java new file mode 100644 index 000000000..a8be3caab --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ExecutionStyle.java @@ -0,0 +1,7 @@ +package io.smallrye.faulttolerance.vertx; + +public enum ExecutionStyle { + EVENT_LOOP, + WORKER, + UNKNOWN, +} diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/VertxContext.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/VertxContext.java new file mode 100644 index 000000000..e764c1f83 --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/VertxContext.java @@ -0,0 +1,71 @@ +package io.smallrye.faulttolerance.vertx; + +import java.util.UUID; + +import io.vertx.core.Context; +import io.vertx.core.impl.ContextInternal; + +// assumes that verticles are not used and no context is created explicitly, +// which means that all contexts are event loop contexts +public class VertxContext { + private final ContextInternal context; + + public static VertxContext current() { + return new VertxContext(ContextInternal.current()); + } + + private VertxContext(ContextInternal context) { + this.context = context; + } + + public VertxContext duplicate() { + return new VertxContext(context.duplicate()); + } + + public void execute(ExecutionStyle style, Runnable runnable) { + switch (style) { + case EVENT_LOOP: + context.runOnContext(ignored -> { + runnable.run(); + }); + break; + case WORKER: + context.executeBlocking(() -> { + runnable.run(); + return null; + }); + break; + default: + throw new UnsupportedOperationException("" + style); + } + } + + public void setTimer(long delayInMillis, Runnable runnable) { + boolean moveToWorker = Context.isOnWorkerThread(); + context.setTimer(delayInMillis, ignored -> { + if (moveToWorker) { + context.executeBlocking(() -> { + runnable.run(); + return null; + }); + } else { + runnable.run(); + } + }); + } + + public ContextDescription describe() { + String uuid = context.getLocal("my-uuid"); + if (uuid == null) { + uuid = UUID.randomUUID().toString(); + context.putLocal("my-uuid", uuid); + } + + ExecutionStyle executionStyle = Context.isOnEventLoopThread() + ? ExecutionStyle.EVENT_LOOP + : (Context.isOnWorkerThread() ? ExecutionStyle.WORKER : ExecutionStyle.UNKNOWN); + + return new ContextDescription(executionStyle, context.getClass().getSimpleName(), uuid, + "" + System.identityHashCode(context)); + } +} diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/AsyncOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/AsyncOnVertxThreadTest.java new file mode 100644 index 000000000..2594dde22 --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/AsyncOnVertxThreadTest.java @@ -0,0 +1,64 @@ +package io.smallrye.faulttolerance.vertx.async; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import org.assertj.core.api.Condition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; + +public class AsyncOnVertxThreadTest extends AbstractVertxTest { + @BeforeEach + public void setUp() { + MyService.currentContexts.clear(); + } + + @Test + public void eventLoop(MyService myService) { + test(myService, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void worker(MyService myService) { + test(myService, ExecutionStyle.WORKER); + } + + private void test(MyService myService, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); + + runOnVertx(() -> { + VertxContext ctx = VertxContext.current(); + for (int i = 0; i < 10; i++) { + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + myService.hello().whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); + }); + } + }); + + // 10 immediate calls + await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10); + + assertThat(results).haveExactly(10, + new Condition<>("Hello!"::equals, "successful result")); + + // 10 immediate calls: 4 identical items for each + assertThat(MyService.currentContexts).hasSize(40); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10); + } +} diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/MyService.java new file mode 100644 index 000000000..ca6b007a0 --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/MyService.java @@ -0,0 +1,43 @@ +package io.smallrye.faulttolerance.vertx.async; + +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; + +import jakarta.enterprise.context.ApplicationScoped; + +import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; +import io.vertx.core.Context; + +@ApplicationScoped +public class MyService { + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); + + @AsynchronousNonBlocking + public CompletionStage hello() { + currentContexts.add(VertxContext.current().describe()); + + ExecutionStyle executionStyle; + if (Context.isOnEventLoopThread()) { + executionStyle = ExecutionStyle.WORKER; + } else if (Context.isOnWorkerThread()) { + executionStyle = ExecutionStyle.EVENT_LOOP; + } else { + throw new UnsupportedOperationException(); + } + + CompletableFuture result = new CompletableFuture<>(); + VertxContext.current().setTimer(1000, () -> { + currentContexts.add(VertxContext.current().describe()); + + VertxContext.current().execute(executionStyle, () -> { + result.complete("Hello!"); + }); + }); + return result; + } +} diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/AsyncBulkheadOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/AsyncBulkheadOnVertxThreadTest.java index 99098120d..dd7bd5ca3 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/AsyncBulkheadOnVertxThreadTest.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/AsyncBulkheadOnVertxThreadTest.java @@ -3,6 +3,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.util.HashSet; +import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -12,26 +14,43 @@ import org.junit.jupiter.api.Test; import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; public class AsyncBulkheadOnVertxThreadTest extends AbstractVertxTest { @BeforeEach public void setUp() { - MyService.invocationThreads.clear(); + MyService.currentContexts.clear(); } @Test - public void nonblockingBulkhead(MyService myService) { - CopyOnWriteArrayList results = new CopyOnWriteArrayList<>(); + public void eventLoop(MyService myService) { + test(myService, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void worker(MyService myService) { + test(myService, ExecutionStyle.WORKER); + } + + private void test(MyService myService, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); runOnVertx(() -> { + VertxContext ctx = VertxContext.current(); for (int i = 0; i < 10; i++) { - myService.hello().whenComplete((value, error) -> { - results.add(error == null ? value : error); + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + myService.hello().whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); }); } }); - // 3 immediate invocations + 3 queued invocations + 4 rejected from bulkhead + // 3 immediate calls + 3 queued calls + 4 rejected from bulkhead await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10); assertThat(results).haveExactly(6, @@ -39,12 +58,11 @@ public void nonblockingBulkhead(MyService myService) { assertThat(results).haveExactly(4, new Condition<>(it -> it instanceof BulkheadException, "failed result")); - // 3 immediate invocations + 3 queued invocations - // 2 identical items for each invocation - assertThat(MyService.invocationThreads).hasSize(12); - assertThat(MyService.invocationThreads).allSatisfy(thread -> { - assertThat(thread).contains("vert.x-eventloop"); - }); - assertThat(MyService.invocationThreads).containsOnly(MyService.invocationThreads.peek()); + // 3 immediate calls + 3 queued calls: 4 identical items for each + // 4 rejected calls: 2 identical items for each + assertThat(MyService.currentContexts).hasSize(32); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10); } } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/MyService.java index 4c4e3d5c0..5780571fd 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/MyService.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/MyService.java @@ -9,21 +9,22 @@ import org.eclipse.microprofile.faulttolerance.Bulkhead; -import io.smallrye.common.annotation.NonBlocking; -import io.vertx.core.Vertx; +import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.VertxContext; @ApplicationScoped public class MyService { - static final Queue invocationThreads = new ConcurrentLinkedQueue<>(); + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); - @NonBlocking + @AsynchronousNonBlocking @Bulkhead(value = 3, waitingTaskQueue = 3) public CompletionStage hello() { - invocationThreads.add(Thread.currentThread().getName()); + currentContexts.add(VertxContext.current().describe()); CompletableFuture result = new CompletableFuture<>(); - Vertx.currentContext().owner().setTimer(1000, ignored -> { - invocationThreads.add(Thread.currentThread().getName()); + VertxContext.current().setTimer(1000, () -> { + currentContexts.add(VertxContext.current().describe()); result.complete("Hello!"); }); diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/AsyncBulkheadRetryOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/AsyncBulkheadRetryOnVertxThreadTest.java index 2cb8dca7e..fa913c04b 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/AsyncBulkheadRetryOnVertxThreadTest.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/AsyncBulkheadRetryOnVertxThreadTest.java @@ -3,6 +3,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.util.HashSet; +import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -12,27 +14,43 @@ import org.junit.jupiter.api.Test; import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; public class AsyncBulkheadRetryOnVertxThreadTest extends AbstractVertxTest { @BeforeEach public void setUp() { - MyService.invocationThreads.clear(); + MyService.currentContexts.clear(); } @Test - public void nonblockingBulkhead(MyService myService) { - CopyOnWriteArrayList results = new CopyOnWriteArrayList<>(); + public void eventLoop(MyService myService) { + test(myService, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void worker(MyService myService) { + test(myService, ExecutionStyle.WORKER); + } + + private void test(MyService myService, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); runOnVertx(() -> { + VertxContext ctx = VertxContext.current(); for (int i = 0; i < 20; i++) { - myService.hello().whenComplete((value, error) -> { - results.add(error == null ? value : error); + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + myService.hello().whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); }); } }); - // 3 immediate invocations + 3 immediately queued invocations + 6 successfully retried rejections from bulkhead - // + 8 unsuccessfully retried rejections from bulkhead + // 3 immediate calls + 3 immediately queued calls + 6 successfully retried rejections + 8 unsuccessfully retried rejections await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 20); assertThat(results).haveExactly(12, @@ -40,12 +58,11 @@ public void nonblockingBulkhead(MyService myService) { assertThat(results).haveExactly(8, new Condition<>(it -> it instanceof BulkheadException, "failed result")); - // 3 immediate invocations + 3 queued invocations + 6 successfully retried rejections from bulkhead - // 2 identical items for each invocation - assertThat(MyService.invocationThreads).hasSize(24); - assertThat(MyService.invocationThreads).allSatisfy(thread -> { - assertThat(thread).contains("vert.x-eventloop"); - }); - assertThat(MyService.invocationThreads).containsOnly(MyService.invocationThreads.peek()); + // 3 immediate calls + 3 queued calls + 6 successfully retried rejections: 4 identical items for each + // 8 unsuccessfully retried rejections: 2 identical items for each + assertThat(MyService.currentContexts).hasSize(64); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(20); } } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/MyService.java index 2aac30c83..092662b0b 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/MyService.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/MyService.java @@ -11,17 +11,18 @@ import org.eclipse.microprofile.faulttolerance.Retry; import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; -import io.vertx.core.Vertx; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.VertxContext; @ApplicationScoped public class MyService { - static final Queue invocationThreads = new ConcurrentLinkedQueue<>(); + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); @AsynchronousNonBlocking @Bulkhead(value = 3, waitingTaskQueue = 3) @Retry(maxRetries = 1, delay = 1000, jitter = 0) public CompletionStage hello() { - invocationThreads.add(Thread.currentThread().getName()); + currentContexts.add(VertxContext.current().describe()); // Note that the Vert.x timer is rather inaccurate. If the retry delay (as defined above) // is close to the completion delay (as defined below), this test may fail spuriously. @@ -29,8 +30,8 @@ public CompletionStage hello() { // the retry, which violates the basic assumption of this test. CompletableFuture result = new CompletableFuture<>(); - Vertx.currentContext().owner().setTimer(200, ignored -> { - invocationThreads.add(Thread.currentThread().getName()); + VertxContext.current().setTimer(200, () -> { + currentContexts.add(VertxContext.current().describe()); result.complete("Hello!"); }); diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/AsyncRetryOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/AsyncRetryOnVertxThreadTest.java index 96fe272f8..6c9c42d18 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/AsyncRetryOnVertxThreadTest.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/AsyncRetryOnVertxThreadTest.java @@ -3,38 +3,63 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicInteger; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; public class AsyncRetryOnVertxThreadTest extends AbstractVertxTest { @BeforeEach public void setUp() { - MyService.invocationThreads.clear(); + MyService.currentContexts.clear(); } @Test - public void nonblockingRetry(MyService myService) { - AtomicReference result = new AtomicReference<>(null); + public void eventLoop(MyService myService) { + test(myService, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void worker(MyService myService) { + test(myService, ExecutionStyle.WORKER); + } + + private void test(MyService myService, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); runOnVertx(() -> { - myService.hello().whenComplete((value, error) -> { - result.set(error == null ? value : error); - }); + VertxContext ctx = VertxContext.current(); + for (int i = 0; i < 10; i++) { + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + myService.hello(new AtomicInteger(0)).whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); + }); + } }); - await().atMost(5, TimeUnit.SECONDS).until(() -> result.get() != null); + // 10 calls + await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10); - assertThat(result.get()).isEqualTo("Hello!"); + assertThat(results).haveExactly(10, + new Condition<>("Hello!"::equals, "successful result")); - assertThat(MyService.invocationThreads).hasSize(11); // 1 initial invocation + 10 retries - assertThat(MyService.invocationThreads).allSatisfy(thread -> { - assertThat(thread).contains("vert.x-eventloop"); - }); - assertThat(MyService.invocationThreads).containsOnly(MyService.invocationThreads.peek()); + // 10 calls, for each of them: 1 initial call + 10 retries + 1 before call + 1 after call + assertThat(MyService.currentContexts).hasSize(130); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10); } } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/MyService.java index 698875038..895509807 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/MyService.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/MyService.java @@ -1,10 +1,10 @@ package io.smallrye.faulttolerance.vertx.retry; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import java.time.temporal.ChronoUnit; import java.util.Queue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -14,21 +14,21 @@ import org.eclipse.microprofile.faulttolerance.Retry; import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.VertxContext; @ApplicationScoped public class MyService { - static final Queue invocationThreads = new ConcurrentLinkedQueue<>(); - - private final AtomicInteger counter = new AtomicInteger(0); + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); @AsynchronousNonBlocking - @Retry(maxRetries = 20, delay = 5, delayUnit = ChronoUnit.MILLIS) - public CompletionStage hello() { - invocationThreads.add(Thread.currentThread().getName()); + @Retry(maxRetries = 20, delay = 5, delayUnit = ChronoUnit.MILLIS, jitter = 0) + public CompletionStage hello(AtomicInteger counter) { + currentContexts.add(VertxContext.current().describe()); int current = counter.incrementAndGet(); if (current > 10) { - return CompletableFuture.completedFuture("Hello!"); + return completedFuture("Hello!"); } return failedFuture(new Exception()); } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/AsyncRetryFallbackOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/AsyncRetryFallbackOnVertxThreadTest.java index 54cf79c35..25389eae8 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/AsyncRetryFallbackOnVertxThreadTest.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/AsyncRetryFallbackOnVertxThreadTest.java @@ -3,38 +3,62 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; public class AsyncRetryFallbackOnVertxThreadTest extends AbstractVertxTest { @BeforeEach public void setUp() { - MyService.invocationThreads.clear(); + MyService.currentContexts.clear(); } @Test - public void nonblockingRetryFallback(MyService myService) { - AtomicReference result = new AtomicReference<>(null); + public void eventLoop(MyService myService) { + test(myService, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void worker(MyService myService) { + test(myService, ExecutionStyle.WORKER); + } + + private void test(MyService myService, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); runOnVertx(() -> { - myService.hello().whenComplete((value, error) -> { - result.set(error == null ? value : error); - }); + VertxContext ctx = VertxContext.current(); + for (int i = 0; i < 10; i++) { + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + myService.hello().whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); + }); + } }); - await().atMost(5, TimeUnit.SECONDS).until(() -> result.get() != null); + // 10 calls + await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10); - assertThat(result.get()).isEqualTo("Hello fallback!"); + assertThat(results).haveExactly(10, + new Condition<>("Hello!"::equals, "successful result")); - assertThat(MyService.invocationThreads).hasSize(12); // 1 initial invocation + 10 retries + 1 fallback - assertThat(MyService.invocationThreads).allSatisfy(thread -> { - assertThat(thread).contains("vert.x-eventloop"); - }); - assertThat(MyService.invocationThreads).containsOnly(MyService.invocationThreads.peek()); + // 10 calls, for each of them: 1 initial call + 10 retries + 1 fallback + 1 before call + 1 after call + assertThat(MyService.currentContexts).hasSize(140); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10); } } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/MyService.java index dc00ce92a..de1f67d63 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/MyService.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/MyService.java @@ -13,22 +13,24 @@ import org.eclipse.microprofile.faulttolerance.Fallback; import org.eclipse.microprofile.faulttolerance.Retry; -import io.smallrye.common.annotation.NonBlocking; +import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.VertxContext; @ApplicationScoped public class MyService { - static final Queue invocationThreads = new ConcurrentLinkedQueue<>(); + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); - @NonBlocking - @Retry(maxRetries = 10, delay = 5, delayUnit = ChronoUnit.MILLIS) + @AsynchronousNonBlocking + @Retry(maxRetries = 10, delay = 5, delayUnit = ChronoUnit.MILLIS, jitter = 0) @Fallback(fallbackMethod = "fallback") public CompletionStage hello() { - invocationThreads.add(Thread.currentThread().getName()); + currentContexts.add(VertxContext.current().describe()); return failedFuture(new Exception()); } public CompletionStage fallback() { - invocationThreads.add(Thread.currentThread().getName()); - return completedFuture("Hello fallback!"); + currentContexts.add(VertxContext.current().describe()); + return completedFuture("Hello!"); } } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/AsyncRetryWithRequestContextOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/AsyncRetryWithRequestContextOnVertxThreadTest.java index c5f6519b7..236bc0054 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/AsyncRetryWithRequestContextOnVertxThreadTest.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/AsyncRetryWithRequestContextOnVertxThreadTest.java @@ -3,17 +3,20 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.util.HashSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import jakarta.enterprise.context.control.RequestContextController; import jakarta.inject.Inject; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; public class AsyncRetryWithRequestContextOnVertxThreadTest extends AbstractVertxTest { @Inject @@ -21,39 +24,51 @@ public class AsyncRetryWithRequestContextOnVertxThreadTest extends AbstractVertx @BeforeEach public void setUp() { - MyService.invocationThreads.clear(); + MyService.currentContexts.clear(); MyRequestScopedService.instanceIds.clear(); } @Test - public void nonblockingRetryWithRequestContext(MyService myService) { - Assertions.setMaxStackTraceElementsDisplayed(100); + public void eventLoop(MyService myService) { + test(myService, ExecutionStyle.EVENT_LOOP); + } + @Test + public void worker(MyService myService) { + test(myService, ExecutionStyle.WORKER); + } + + private void test(MyService myService, ExecutionStyle executionStyle) { AtomicReference result = new AtomicReference<>(null); runOnVertx(() -> { - boolean activated = rcc.activate(); - try { - myService.hello().whenComplete((value, error) -> { - result.set(error == null ? value : error); - }); - } finally { - if (activated) { - rcc.deactivate(); + VertxContext.current().duplicate().execute(executionStyle, () -> { + boolean activated = rcc.activate(); + try { + MyService.currentContexts.add(VertxContext.current().describe()); + myService.hello().whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + result.set(error == null ? value : error); + }); + } finally { + if (activated) { + rcc.deactivate(); + } } - } + }); }); await().atMost(5, TimeUnit.SECONDS).until(() -> result.get() != null); assertThat(result.get()).isEqualTo("Hello!"); - assertThat(MyService.invocationThreads).hasSize(11); // 1 initial invocation + 10 retries - assertThat(MyService.invocationThreads).allSatisfy(thread -> { - assertThat(thread).contains("vert.x-eventloop"); - }); - assertThat(MyService.invocationThreads).containsOnly(MyService.invocationThreads.peek()); + // 1 initial call + 10 retries + 1 before call + 1 after call + assertThat(MyService.currentContexts).hasSize(13); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(1); + // 1 initial invocation + 10 retries assertThat(MyRequestScopedService.instanceIds).hasSize(11); assertThat(MyRequestScopedService.instanceIds).containsOnly(MyRequestScopedService.instanceIds.peek()); } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/MyService.java index 61d91057c..18912801e 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/MyService.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/MyService.java @@ -1,10 +1,10 @@ package io.smallrye.faulttolerance.vertx.retry.requestcontext; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import java.time.temporal.ChronoUnit; import java.util.Queue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -15,10 +15,12 @@ import org.eclipse.microprofile.faulttolerance.Retry; import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.VertxContext; @ApplicationScoped public class MyService { - static final Queue invocationThreads = new ConcurrentLinkedQueue<>(); + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); private final AtomicInteger counter = new AtomicInteger(0); @@ -30,11 +32,11 @@ public class MyService { public CompletionStage hello() { requestScopedService.call(); - invocationThreads.add(Thread.currentThread().getName()); + currentContexts.add(VertxContext.current().describe()); int current = counter.incrementAndGet(); if (current > 10) { - return CompletableFuture.completedFuture("Hello!"); + return completedFuture("Hello!"); } return failedFuture(new Exception()); } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/AsyncTimeoutOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/AsyncTimeoutOnVertxThreadTest.java new file mode 100644 index 000000000..bff97c558 --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/AsyncTimeoutOnVertxThreadTest.java @@ -0,0 +1,103 @@ +package io.smallrye.faulttolerance.vertx.timeout; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import org.assertj.core.api.Condition; +import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; + +public class AsyncTimeoutOnVertxThreadTest extends AbstractVertxTest { + @BeforeEach + public void setUp() { + MyService.currentContexts.clear(); + } + + @Test + public void timeout_eventLoop(MyService service) { + timeout(service, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void timeout_worker(MyService service) { + timeout(service, ExecutionStyle.WORKER); + } + + private void timeout(MyService service, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); + + runOnVertx(() -> { + VertxContext ctx = VertxContext.current(); + for (int i = 0; i < 10; i++) { + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + service.hello(5000).whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); + }); + } + }); + + // 10 calls + await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10); + + assertThat(results).haveExactly(10, + new Condition<>(it -> it instanceof TimeoutException, "failed result")); + + // 10 calls, for each of them: 1 before sleep + 1 before call + 1 after call + assertThat(MyService.currentContexts).hasSize(30); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10); + } + + @Test + public void noTimeout_eventLoop(MyService service) { + noTimeout(service, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void noTimeout_worker(MyService service) { + noTimeout(service, ExecutionStyle.WORKER); + } + + private void noTimeout(MyService service, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); + + runOnVertx(() -> { + VertxContext ctx = VertxContext.current(); + for (int i = 0; i < 10; i++) { + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + service.hello(50).whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); + }); + } + }); + + await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10); + + assertThat(results).haveExactly(10, + new Condition<>("Hello!"::equals, "successful result")); + + // 10 calls, for each of them: 1 before sleep + 1 after sleep + 1 before call + 1 after call + assertThat(MyService.currentContexts).hasSize(40); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10); + } +} diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/MyService.java new file mode 100644 index 000000000..098f5a7df --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/MyService.java @@ -0,0 +1,32 @@ +package io.smallrye.faulttolerance.vertx.timeout; + +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.faulttolerance.Timeout; + +import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.VertxContext; + +@ApplicationScoped +public class MyService { + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); + + @Timeout + @AsynchronousNonBlocking + public CompletionStage hello(long sleep) { + currentContexts.add(VertxContext.current().describe()); + + CompletableFuture result = new CompletableFuture<>(); + VertxContext.current().setTimer(sleep, () -> { + currentContexts.add(VertxContext.current().describe()); + result.complete("Hello!"); + }); + return result; + } +}