From eb83fe2a2610bbda59765fd8cfa57b1b2c595609 Mon Sep 17 00:00:00 2001 From: Ladislav Thon Date: Thu, 12 Dec 2024 17:15:15 +0100 Subject: [PATCH] improve Vert.x tests The improvements mainly consist of: - running each service call on a separate duplicated context; - running each test on an event loop thread and on a worker thread. This better simulates what happens in practice (at least in Quarkus). Further, some new tests are added: - timeouts with Vert.x; - asynchronous execution with Vert.x, where the future is completed on a different kind of thread than the original. --- .../vertx/ContextDescription.java | 47 ++++++++ .../faulttolerance/vertx/ExecutionStyle.java | 7 ++ .../faulttolerance/vertx/VertxContext.java | 71 ++++++++++++ .../vertx/async/AsyncOnVertxThreadTest.java | 64 +++++++++++ .../faulttolerance/vertx/async/MyService.java | 43 ++++++++ .../AsyncBulkheadOnVertxThreadTest.java | 44 +++++--- .../vertx/bulkhead/MyService.java | 15 +-- .../AsyncBulkheadRetryOnVertxThreadTest.java | 45 +++++--- .../vertx/bulkhead/retry/MyService.java | 11 +- .../retry/AsyncRetryOnVertxThreadTest.java | 53 ++++++--- .../faulttolerance/vertx/retry/MyService.java | 16 +-- .../AsyncRetryFallbackOnVertxThreadTest.java | 52 ++++++--- .../vertx/retry/fallback/MyService.java | 16 +-- ...ryWithRequestContextOnVertxThreadTest.java | 51 ++++++--- .../vertx/retry/requestcontext/MyService.java | 10 +- .../AsyncTimeoutOnVertxThreadTest.java | 103 ++++++++++++++++++ .../vertx/timeout/MyService.java | 32 ++++++ 17 files changed, 576 insertions(+), 104 deletions(-) create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ContextDescription.java create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ExecutionStyle.java create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/VertxContext.java create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/AsyncOnVertxThreadTest.java create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/MyService.java create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/AsyncTimeoutOnVertxThreadTest.java create mode 100644 testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/MyService.java diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ContextDescription.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ContextDescription.java new file mode 100644 index 000000000..8ff772cae --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ContextDescription.java @@ -0,0 +1,47 @@ +package io.smallrye.faulttolerance.vertx; + +import java.util.Locale; +import java.util.Objects; + +public final class ContextDescription { + public final ExecutionStyle executionStyle; + public final String contextClass; + public final String uuid; + public final String contextHash; + + ContextDescription(ExecutionStyle executionStyle, String contextClass, String uuid, String contextHash) { + this.executionStyle = executionStyle; + this.contextClass = contextClass; + this.contextHash = contextHash; + this.uuid = uuid; + } + + public boolean isDuplicatedContext() { + return "DuplicatedContext".equals(contextClass); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof ContextDescription)) { + return false; + } + ContextDescription that = (ContextDescription) o; + return Objects.equals(executionStyle, that.executionStyle) + && Objects.equals(contextClass, that.contextClass) + && Objects.equals(uuid, that.uuid) + && Objects.equals(contextHash, that.contextHash); + } + + @Override + public int hashCode() { + return Objects.hash(executionStyle, contextClass, uuid, contextHash); + } + + @Override + public String toString() { + return executionStyle.toString().toLowerCase(Locale.ROOT) + + "|" + contextClass + + "|" + uuid + + "|" + contextHash; + } +} diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ExecutionStyle.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ExecutionStyle.java new file mode 100644 index 000000000..a8be3caab --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/ExecutionStyle.java @@ -0,0 +1,7 @@ +package io.smallrye.faulttolerance.vertx; + +public enum ExecutionStyle { + EVENT_LOOP, + WORKER, + UNKNOWN, +} diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/VertxContext.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/VertxContext.java new file mode 100644 index 000000000..e764c1f83 --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/VertxContext.java @@ -0,0 +1,71 @@ +package io.smallrye.faulttolerance.vertx; + +import java.util.UUID; + +import io.vertx.core.Context; +import io.vertx.core.impl.ContextInternal; + +// assumes that verticles are not used and no context is created explicitly, +// which means that all contexts are event loop contexts +public class VertxContext { + private final ContextInternal context; + + public static VertxContext current() { + return new VertxContext(ContextInternal.current()); + } + + private VertxContext(ContextInternal context) { + this.context = context; + } + + public VertxContext duplicate() { + return new VertxContext(context.duplicate()); + } + + public void execute(ExecutionStyle style, Runnable runnable) { + switch (style) { + case EVENT_LOOP: + context.runOnContext(ignored -> { + runnable.run(); + }); + break; + case WORKER: + context.executeBlocking(() -> { + runnable.run(); + return null; + }); + break; + default: + throw new UnsupportedOperationException("" + style); + } + } + + public void setTimer(long delayInMillis, Runnable runnable) { + boolean moveToWorker = Context.isOnWorkerThread(); + context.setTimer(delayInMillis, ignored -> { + if (moveToWorker) { + context.executeBlocking(() -> { + runnable.run(); + return null; + }); + } else { + runnable.run(); + } + }); + } + + public ContextDescription describe() { + String uuid = context.getLocal("my-uuid"); + if (uuid == null) { + uuid = UUID.randomUUID().toString(); + context.putLocal("my-uuid", uuid); + } + + ExecutionStyle executionStyle = Context.isOnEventLoopThread() + ? ExecutionStyle.EVENT_LOOP + : (Context.isOnWorkerThread() ? ExecutionStyle.WORKER : ExecutionStyle.UNKNOWN); + + return new ContextDescription(executionStyle, context.getClass().getSimpleName(), uuid, + "" + System.identityHashCode(context)); + } +} diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/AsyncOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/AsyncOnVertxThreadTest.java new file mode 100644 index 000000000..2594dde22 --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/AsyncOnVertxThreadTest.java @@ -0,0 +1,64 @@ +package io.smallrye.faulttolerance.vertx.async; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import org.assertj.core.api.Condition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; + +public class AsyncOnVertxThreadTest extends AbstractVertxTest { + @BeforeEach + public void setUp() { + MyService.currentContexts.clear(); + } + + @Test + public void eventLoop(MyService myService) { + test(myService, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void worker(MyService myService) { + test(myService, ExecutionStyle.WORKER); + } + + private void test(MyService myService, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); + + runOnVertx(() -> { + VertxContext ctx = VertxContext.current(); + for (int i = 0; i < 10; i++) { + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + myService.hello().whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); + }); + } + }); + + // 10 immediate calls + await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10); + + assertThat(results).haveExactly(10, + new Condition<>("Hello!"::equals, "successful result")); + + // 10 immediate calls: 4 identical items for each + assertThat(MyService.currentContexts).hasSize(40); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10); + } +} diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/MyService.java new file mode 100644 index 000000000..ca6b007a0 --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/async/MyService.java @@ -0,0 +1,43 @@ +package io.smallrye.faulttolerance.vertx.async; + +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; + +import jakarta.enterprise.context.ApplicationScoped; + +import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; +import io.vertx.core.Context; + +@ApplicationScoped +public class MyService { + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); + + @AsynchronousNonBlocking + public CompletionStage hello() { + currentContexts.add(VertxContext.current().describe()); + + ExecutionStyle executionStyle; + if (Context.isOnEventLoopThread()) { + executionStyle = ExecutionStyle.WORKER; + } else if (Context.isOnWorkerThread()) { + executionStyle = ExecutionStyle.EVENT_LOOP; + } else { + throw new UnsupportedOperationException(); + } + + CompletableFuture result = new CompletableFuture<>(); + VertxContext.current().setTimer(1000, () -> { + currentContexts.add(VertxContext.current().describe()); + + VertxContext.current().execute(executionStyle, () -> { + result.complete("Hello!"); + }); + }); + return result; + } +} diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/AsyncBulkheadOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/AsyncBulkheadOnVertxThreadTest.java index 99098120d..dd7bd5ca3 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/AsyncBulkheadOnVertxThreadTest.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/AsyncBulkheadOnVertxThreadTest.java @@ -3,6 +3,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.util.HashSet; +import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -12,26 +14,43 @@ import org.junit.jupiter.api.Test; import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; public class AsyncBulkheadOnVertxThreadTest extends AbstractVertxTest { @BeforeEach public void setUp() { - MyService.invocationThreads.clear(); + MyService.currentContexts.clear(); } @Test - public void nonblockingBulkhead(MyService myService) { - CopyOnWriteArrayList results = new CopyOnWriteArrayList<>(); + public void eventLoop(MyService myService) { + test(myService, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void worker(MyService myService) { + test(myService, ExecutionStyle.WORKER); + } + + private void test(MyService myService, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); runOnVertx(() -> { + VertxContext ctx = VertxContext.current(); for (int i = 0; i < 10; i++) { - myService.hello().whenComplete((value, error) -> { - results.add(error == null ? value : error); + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + myService.hello().whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); }); } }); - // 3 immediate invocations + 3 queued invocations + 4 rejected from bulkhead + // 3 immediate calls + 3 queued calls + 4 rejected from bulkhead await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10); assertThat(results).haveExactly(6, @@ -39,12 +58,11 @@ public void nonblockingBulkhead(MyService myService) { assertThat(results).haveExactly(4, new Condition<>(it -> it instanceof BulkheadException, "failed result")); - // 3 immediate invocations + 3 queued invocations - // 2 identical items for each invocation - assertThat(MyService.invocationThreads).hasSize(12); - assertThat(MyService.invocationThreads).allSatisfy(thread -> { - assertThat(thread).contains("vert.x-eventloop"); - }); - assertThat(MyService.invocationThreads).containsOnly(MyService.invocationThreads.peek()); + // 3 immediate calls + 3 queued calls: 4 identical items for each + // 4 rejected calls: 2 identical items for each + assertThat(MyService.currentContexts).hasSize(32); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10); } } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/MyService.java index 4c4e3d5c0..5780571fd 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/MyService.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/MyService.java @@ -9,21 +9,22 @@ import org.eclipse.microprofile.faulttolerance.Bulkhead; -import io.smallrye.common.annotation.NonBlocking; -import io.vertx.core.Vertx; +import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.VertxContext; @ApplicationScoped public class MyService { - static final Queue invocationThreads = new ConcurrentLinkedQueue<>(); + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); - @NonBlocking + @AsynchronousNonBlocking @Bulkhead(value = 3, waitingTaskQueue = 3) public CompletionStage hello() { - invocationThreads.add(Thread.currentThread().getName()); + currentContexts.add(VertxContext.current().describe()); CompletableFuture result = new CompletableFuture<>(); - Vertx.currentContext().owner().setTimer(1000, ignored -> { - invocationThreads.add(Thread.currentThread().getName()); + VertxContext.current().setTimer(1000, () -> { + currentContexts.add(VertxContext.current().describe()); result.complete("Hello!"); }); diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/AsyncBulkheadRetryOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/AsyncBulkheadRetryOnVertxThreadTest.java index 2cb8dca7e..fa913c04b 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/AsyncBulkheadRetryOnVertxThreadTest.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/AsyncBulkheadRetryOnVertxThreadTest.java @@ -3,6 +3,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.util.HashSet; +import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; @@ -12,27 +14,43 @@ import org.junit.jupiter.api.Test; import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; public class AsyncBulkheadRetryOnVertxThreadTest extends AbstractVertxTest { @BeforeEach public void setUp() { - MyService.invocationThreads.clear(); + MyService.currentContexts.clear(); } @Test - public void nonblockingBulkhead(MyService myService) { - CopyOnWriteArrayList results = new CopyOnWriteArrayList<>(); + public void eventLoop(MyService myService) { + test(myService, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void worker(MyService myService) { + test(myService, ExecutionStyle.WORKER); + } + + private void test(MyService myService, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); runOnVertx(() -> { + VertxContext ctx = VertxContext.current(); for (int i = 0; i < 20; i++) { - myService.hello().whenComplete((value, error) -> { - results.add(error == null ? value : error); + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + myService.hello().whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); }); } }); - // 3 immediate invocations + 3 immediately queued invocations + 6 successfully retried rejections from bulkhead - // + 8 unsuccessfully retried rejections from bulkhead + // 3 immediate calls + 3 immediately queued calls + 6 successfully retried rejections + 8 unsuccessfully retried rejections await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 20); assertThat(results).haveExactly(12, @@ -40,12 +58,11 @@ public void nonblockingBulkhead(MyService myService) { assertThat(results).haveExactly(8, new Condition<>(it -> it instanceof BulkheadException, "failed result")); - // 3 immediate invocations + 3 queued invocations + 6 successfully retried rejections from bulkhead - // 2 identical items for each invocation - assertThat(MyService.invocationThreads).hasSize(24); - assertThat(MyService.invocationThreads).allSatisfy(thread -> { - assertThat(thread).contains("vert.x-eventloop"); - }); - assertThat(MyService.invocationThreads).containsOnly(MyService.invocationThreads.peek()); + // 3 immediate calls + 3 queued calls + 6 successfully retried rejections: 4 identical items for each + // 8 unsuccessfully retried rejections: 2 identical items for each + assertThat(MyService.currentContexts).hasSize(64); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(20); } } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/MyService.java index 2aac30c83..092662b0b 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/MyService.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/bulkhead/retry/MyService.java @@ -11,17 +11,18 @@ import org.eclipse.microprofile.faulttolerance.Retry; import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; -import io.vertx.core.Vertx; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.VertxContext; @ApplicationScoped public class MyService { - static final Queue invocationThreads = new ConcurrentLinkedQueue<>(); + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); @AsynchronousNonBlocking @Bulkhead(value = 3, waitingTaskQueue = 3) @Retry(maxRetries = 1, delay = 1000, jitter = 0) public CompletionStage hello() { - invocationThreads.add(Thread.currentThread().getName()); + currentContexts.add(VertxContext.current().describe()); // Note that the Vert.x timer is rather inaccurate. If the retry delay (as defined above) // is close to the completion delay (as defined below), this test may fail spuriously. @@ -29,8 +30,8 @@ public CompletionStage hello() { // the retry, which violates the basic assumption of this test. CompletableFuture result = new CompletableFuture<>(); - Vertx.currentContext().owner().setTimer(200, ignored -> { - invocationThreads.add(Thread.currentThread().getName()); + VertxContext.current().setTimer(200, () -> { + currentContexts.add(VertxContext.current().describe()); result.complete("Hello!"); }); diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/AsyncRetryOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/AsyncRetryOnVertxThreadTest.java index 96fe272f8..6c9c42d18 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/AsyncRetryOnVertxThreadTest.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/AsyncRetryOnVertxThreadTest.java @@ -3,38 +3,63 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.AtomicInteger; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; public class AsyncRetryOnVertxThreadTest extends AbstractVertxTest { @BeforeEach public void setUp() { - MyService.invocationThreads.clear(); + MyService.currentContexts.clear(); } @Test - public void nonblockingRetry(MyService myService) { - AtomicReference result = new AtomicReference<>(null); + public void eventLoop(MyService myService) { + test(myService, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void worker(MyService myService) { + test(myService, ExecutionStyle.WORKER); + } + + private void test(MyService myService, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); runOnVertx(() -> { - myService.hello().whenComplete((value, error) -> { - result.set(error == null ? value : error); - }); + VertxContext ctx = VertxContext.current(); + for (int i = 0; i < 10; i++) { + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + myService.hello(new AtomicInteger(0)).whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); + }); + } }); - await().atMost(5, TimeUnit.SECONDS).until(() -> result.get() != null); + // 10 calls + await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10); - assertThat(result.get()).isEqualTo("Hello!"); + assertThat(results).haveExactly(10, + new Condition<>("Hello!"::equals, "successful result")); - assertThat(MyService.invocationThreads).hasSize(11); // 1 initial invocation + 10 retries - assertThat(MyService.invocationThreads).allSatisfy(thread -> { - assertThat(thread).contains("vert.x-eventloop"); - }); - assertThat(MyService.invocationThreads).containsOnly(MyService.invocationThreads.peek()); + // 10 calls, for each of them: 1 initial call + 10 retries + 1 before call + 1 after call + assertThat(MyService.currentContexts).hasSize(130); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10); } } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/MyService.java index 698875038..895509807 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/MyService.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/MyService.java @@ -1,10 +1,10 @@ package io.smallrye.faulttolerance.vertx.retry; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import java.time.temporal.ChronoUnit; import java.util.Queue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -14,21 +14,21 @@ import org.eclipse.microprofile.faulttolerance.Retry; import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.VertxContext; @ApplicationScoped public class MyService { - static final Queue invocationThreads = new ConcurrentLinkedQueue<>(); - - private final AtomicInteger counter = new AtomicInteger(0); + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); @AsynchronousNonBlocking - @Retry(maxRetries = 20, delay = 5, delayUnit = ChronoUnit.MILLIS) - public CompletionStage hello() { - invocationThreads.add(Thread.currentThread().getName()); + @Retry(maxRetries = 20, delay = 5, delayUnit = ChronoUnit.MILLIS, jitter = 0) + public CompletionStage hello(AtomicInteger counter) { + currentContexts.add(VertxContext.current().describe()); int current = counter.incrementAndGet(); if (current > 10) { - return CompletableFuture.completedFuture("Hello!"); + return completedFuture("Hello!"); } return failedFuture(new Exception()); } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/AsyncRetryFallbackOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/AsyncRetryFallbackOnVertxThreadTest.java index 54cf79c35..25389eae8 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/AsyncRetryFallbackOnVertxThreadTest.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/AsyncRetryFallbackOnVertxThreadTest.java @@ -3,38 +3,62 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; +import org.assertj.core.api.Condition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; public class AsyncRetryFallbackOnVertxThreadTest extends AbstractVertxTest { @BeforeEach public void setUp() { - MyService.invocationThreads.clear(); + MyService.currentContexts.clear(); } @Test - public void nonblockingRetryFallback(MyService myService) { - AtomicReference result = new AtomicReference<>(null); + public void eventLoop(MyService myService) { + test(myService, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void worker(MyService myService) { + test(myService, ExecutionStyle.WORKER); + } + + private void test(MyService myService, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); runOnVertx(() -> { - myService.hello().whenComplete((value, error) -> { - result.set(error == null ? value : error); - }); + VertxContext ctx = VertxContext.current(); + for (int i = 0; i < 10; i++) { + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + myService.hello().whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); + }); + } }); - await().atMost(5, TimeUnit.SECONDS).until(() -> result.get() != null); + // 10 calls + await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10); - assertThat(result.get()).isEqualTo("Hello fallback!"); + assertThat(results).haveExactly(10, + new Condition<>("Hello!"::equals, "successful result")); - assertThat(MyService.invocationThreads).hasSize(12); // 1 initial invocation + 10 retries + 1 fallback - assertThat(MyService.invocationThreads).allSatisfy(thread -> { - assertThat(thread).contains("vert.x-eventloop"); - }); - assertThat(MyService.invocationThreads).containsOnly(MyService.invocationThreads.peek()); + // 10 calls, for each of them: 1 initial call + 10 retries + 1 fallback + 1 before call + 1 after call + assertThat(MyService.currentContexts).hasSize(140); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10); } } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/MyService.java index dc00ce92a..de1f67d63 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/MyService.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/fallback/MyService.java @@ -13,22 +13,24 @@ import org.eclipse.microprofile.faulttolerance.Fallback; import org.eclipse.microprofile.faulttolerance.Retry; -import io.smallrye.common.annotation.NonBlocking; +import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.VertxContext; @ApplicationScoped public class MyService { - static final Queue invocationThreads = new ConcurrentLinkedQueue<>(); + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); - @NonBlocking - @Retry(maxRetries = 10, delay = 5, delayUnit = ChronoUnit.MILLIS) + @AsynchronousNonBlocking + @Retry(maxRetries = 10, delay = 5, delayUnit = ChronoUnit.MILLIS, jitter = 0) @Fallback(fallbackMethod = "fallback") public CompletionStage hello() { - invocationThreads.add(Thread.currentThread().getName()); + currentContexts.add(VertxContext.current().describe()); return failedFuture(new Exception()); } public CompletionStage fallback() { - invocationThreads.add(Thread.currentThread().getName()); - return completedFuture("Hello fallback!"); + currentContexts.add(VertxContext.current().describe()); + return completedFuture("Hello!"); } } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/AsyncRetryWithRequestContextOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/AsyncRetryWithRequestContextOnVertxThreadTest.java index c5f6519b7..236bc0054 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/AsyncRetryWithRequestContextOnVertxThreadTest.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/AsyncRetryWithRequestContextOnVertxThreadTest.java @@ -3,17 +3,20 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; +import java.util.HashSet; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import jakarta.enterprise.context.control.RequestContextController; import jakarta.inject.Inject; -import org.assertj.core.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; public class AsyncRetryWithRequestContextOnVertxThreadTest extends AbstractVertxTest { @Inject @@ -21,39 +24,51 @@ public class AsyncRetryWithRequestContextOnVertxThreadTest extends AbstractVertx @BeforeEach public void setUp() { - MyService.invocationThreads.clear(); + MyService.currentContexts.clear(); MyRequestScopedService.instanceIds.clear(); } @Test - public void nonblockingRetryWithRequestContext(MyService myService) { - Assertions.setMaxStackTraceElementsDisplayed(100); + public void eventLoop(MyService myService) { + test(myService, ExecutionStyle.EVENT_LOOP); + } + @Test + public void worker(MyService myService) { + test(myService, ExecutionStyle.WORKER); + } + + private void test(MyService myService, ExecutionStyle executionStyle) { AtomicReference result = new AtomicReference<>(null); runOnVertx(() -> { - boolean activated = rcc.activate(); - try { - myService.hello().whenComplete((value, error) -> { - result.set(error == null ? value : error); - }); - } finally { - if (activated) { - rcc.deactivate(); + VertxContext.current().duplicate().execute(executionStyle, () -> { + boolean activated = rcc.activate(); + try { + MyService.currentContexts.add(VertxContext.current().describe()); + myService.hello().whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + result.set(error == null ? value : error); + }); + } finally { + if (activated) { + rcc.deactivate(); + } } - } + }); }); await().atMost(5, TimeUnit.SECONDS).until(() -> result.get() != null); assertThat(result.get()).isEqualTo("Hello!"); - assertThat(MyService.invocationThreads).hasSize(11); // 1 initial invocation + 10 retries - assertThat(MyService.invocationThreads).allSatisfy(thread -> { - assertThat(thread).contains("vert.x-eventloop"); - }); - assertThat(MyService.invocationThreads).containsOnly(MyService.invocationThreads.peek()); + // 1 initial call + 10 retries + 1 before call + 1 after call + assertThat(MyService.currentContexts).hasSize(13); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(1); + // 1 initial invocation + 10 retries assertThat(MyRequestScopedService.instanceIds).hasSize(11); assertThat(MyRequestScopedService.instanceIds).containsOnly(MyRequestScopedService.instanceIds.peek()); } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/MyService.java index 61d91057c..18912801e 100644 --- a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/MyService.java +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/retry/requestcontext/MyService.java @@ -1,10 +1,10 @@ package io.smallrye.faulttolerance.vertx.retry.requestcontext; +import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import java.time.temporal.ChronoUnit; import java.util.Queue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; @@ -15,10 +15,12 @@ import org.eclipse.microprofile.faulttolerance.Retry; import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.VertxContext; @ApplicationScoped public class MyService { - static final Queue invocationThreads = new ConcurrentLinkedQueue<>(); + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); private final AtomicInteger counter = new AtomicInteger(0); @@ -30,11 +32,11 @@ public class MyService { public CompletionStage hello() { requestScopedService.call(); - invocationThreads.add(Thread.currentThread().getName()); + currentContexts.add(VertxContext.current().describe()); int current = counter.incrementAndGet(); if (current > 10) { - return CompletableFuture.completedFuture("Hello!"); + return completedFuture("Hello!"); } return failedFuture(new Exception()); } diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/AsyncTimeoutOnVertxThreadTest.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/AsyncTimeoutOnVertxThreadTest.java new file mode 100644 index 000000000..bff97c558 --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/AsyncTimeoutOnVertxThreadTest.java @@ -0,0 +1,103 @@ +package io.smallrye.faulttolerance.vertx.timeout; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; + +import org.assertj.core.api.Condition; +import org.eclipse.microprofile.faulttolerance.exceptions.TimeoutException; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.smallrye.faulttolerance.vertx.AbstractVertxTest; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.ExecutionStyle; +import io.smallrye.faulttolerance.vertx.VertxContext; + +public class AsyncTimeoutOnVertxThreadTest extends AbstractVertxTest { + @BeforeEach + public void setUp() { + MyService.currentContexts.clear(); + } + + @Test + public void timeout_eventLoop(MyService service) { + timeout(service, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void timeout_worker(MyService service) { + timeout(service, ExecutionStyle.WORKER); + } + + private void timeout(MyService service, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); + + runOnVertx(() -> { + VertxContext ctx = VertxContext.current(); + for (int i = 0; i < 10; i++) { + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + service.hello(5000).whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); + }); + } + }); + + // 10 calls + await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10); + + assertThat(results).haveExactly(10, + new Condition<>(it -> it instanceof TimeoutException, "failed result")); + + // 10 calls, for each of them: 1 before sleep + 1 before call + 1 after call + assertThat(MyService.currentContexts).hasSize(30); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10); + } + + @Test + public void noTimeout_eventLoop(MyService service) { + noTimeout(service, ExecutionStyle.EVENT_LOOP); + } + + @Test + public void noTimeout_worker(MyService service) { + noTimeout(service, ExecutionStyle.WORKER); + } + + private void noTimeout(MyService service, ExecutionStyle executionStyle) { + List results = new CopyOnWriteArrayList<>(); + + runOnVertx(() -> { + VertxContext ctx = VertxContext.current(); + for (int i = 0; i < 10; i++) { + ctx.duplicate().execute(executionStyle, () -> { + MyService.currentContexts.add(VertxContext.current().describe()); + service.hello(50).whenComplete((value, error) -> { + MyService.currentContexts.add(VertxContext.current().describe()); + results.add(error == null ? value : error); + }); + }); + } + }); + + await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10); + + assertThat(results).haveExactly(10, + new Condition<>("Hello!"::equals, "successful result")); + + // 10 calls, for each of them: 1 before sleep + 1 after sleep + 1 before call + 1 after call + assertThat(MyService.currentContexts).hasSize(40); + assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle); + assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext); + assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10); + } +} diff --git a/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/MyService.java b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/MyService.java new file mode 100644 index 000000000..098f5a7df --- /dev/null +++ b/testsuite/integration/src/test/java/io/smallrye/faulttolerance/vertx/timeout/MyService.java @@ -0,0 +1,32 @@ +package io.smallrye.faulttolerance.vertx.timeout; + +import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.ConcurrentLinkedQueue; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.faulttolerance.Timeout; + +import io.smallrye.faulttolerance.api.AsynchronousNonBlocking; +import io.smallrye.faulttolerance.vertx.ContextDescription; +import io.smallrye.faulttolerance.vertx.VertxContext; + +@ApplicationScoped +public class MyService { + static final Queue currentContexts = new ConcurrentLinkedQueue<>(); + + @Timeout + @AsynchronousNonBlocking + public CompletionStage hello(long sleep) { + currentContexts.add(VertxContext.current().describe()); + + CompletableFuture result = new CompletableFuture<>(); + VertxContext.current().setTimer(sleep, () -> { + currentContexts.add(VertxContext.current().describe()); + result.complete("Hello!"); + }); + return result; + } +}