Skip to content

Commit

Permalink
improve Vert.x tests
Browse files Browse the repository at this point in the history
The improvements mainly consist of:

- running each service call on a separate duplicated context;
- running each test on an event loop thread and on a worker thread.

This better simulates what happens in practice (at least in Quarkus).

Further, some new tests are added:

- timeouts with Vert.x;
- asynchronous execution with Vert.x, where the future is completed
  on a different kind of thread than the original.
  • Loading branch information
Ladicek committed Jan 6, 2025
1 parent 11ffb4e commit eb83fe2
Show file tree
Hide file tree
Showing 17 changed files with 576 additions and 104 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.smallrye.faulttolerance.vertx;

import java.util.Locale;
import java.util.Objects;

public final class ContextDescription {
public final ExecutionStyle executionStyle;
public final String contextClass;
public final String uuid;
public final String contextHash;

ContextDescription(ExecutionStyle executionStyle, String contextClass, String uuid, String contextHash) {
this.executionStyle = executionStyle;
this.contextClass = contextClass;
this.contextHash = contextHash;
this.uuid = uuid;
}

public boolean isDuplicatedContext() {
return "DuplicatedContext".equals(contextClass);
}

@Override
public boolean equals(Object o) {
if (!(o instanceof ContextDescription)) {
return false;
}
ContextDescription that = (ContextDescription) o;
return Objects.equals(executionStyle, that.executionStyle)
&& Objects.equals(contextClass, that.contextClass)
&& Objects.equals(uuid, that.uuid)
&& Objects.equals(contextHash, that.contextHash);
}

@Override
public int hashCode() {
return Objects.hash(executionStyle, contextClass, uuid, contextHash);
}

@Override
public String toString() {
return executionStyle.toString().toLowerCase(Locale.ROOT)
+ "|" + contextClass
+ "|" + uuid
+ "|" + contextHash;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.smallrye.faulttolerance.vertx;

public enum ExecutionStyle {
EVENT_LOOP,
WORKER,
UNKNOWN,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.smallrye.faulttolerance.vertx;

import java.util.UUID;

import io.vertx.core.Context;
import io.vertx.core.impl.ContextInternal;

// assumes that verticles are not used and no context is created explicitly,
// which means that all contexts are event loop contexts
public class VertxContext {
private final ContextInternal context;

public static VertxContext current() {
return new VertxContext(ContextInternal.current());
}

private VertxContext(ContextInternal context) {
this.context = context;
}

public VertxContext duplicate() {
return new VertxContext(context.duplicate());
}

public void execute(ExecutionStyle style, Runnable runnable) {
switch (style) {
case EVENT_LOOP:
context.runOnContext(ignored -> {
runnable.run();
});
break;
case WORKER:
context.executeBlocking(() -> {
runnable.run();
return null;
});
break;
default:
throw new UnsupportedOperationException("" + style);
}
}

public void setTimer(long delayInMillis, Runnable runnable) {
boolean moveToWorker = Context.isOnWorkerThread();
context.setTimer(delayInMillis, ignored -> {
if (moveToWorker) {
context.executeBlocking(() -> {
runnable.run();
return null;
});
} else {
runnable.run();
}
});
}

public ContextDescription describe() {
String uuid = context.getLocal("my-uuid");
if (uuid == null) {
uuid = UUID.randomUUID().toString();
context.putLocal("my-uuid", uuid);
}

ExecutionStyle executionStyle = Context.isOnEventLoopThread()
? ExecutionStyle.EVENT_LOOP
: (Context.isOnWorkerThread() ? ExecutionStyle.WORKER : ExecutionStyle.UNKNOWN);

return new ContextDescription(executionStyle, context.getClass().getSimpleName(), uuid,
"" + System.identityHashCode(context));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package io.smallrye.faulttolerance.vertx.async;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;

import org.assertj.core.api.Condition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.smallrye.faulttolerance.vertx.AbstractVertxTest;
import io.smallrye.faulttolerance.vertx.ContextDescription;
import io.smallrye.faulttolerance.vertx.ExecutionStyle;
import io.smallrye.faulttolerance.vertx.VertxContext;

public class AsyncOnVertxThreadTest extends AbstractVertxTest {
@BeforeEach
public void setUp() {
MyService.currentContexts.clear();
}

@Test
public void eventLoop(MyService myService) {
test(myService, ExecutionStyle.EVENT_LOOP);
}

@Test
public void worker(MyService myService) {
test(myService, ExecutionStyle.WORKER);
}

private void test(MyService myService, ExecutionStyle executionStyle) {
List<Object> results = new CopyOnWriteArrayList<>();

runOnVertx(() -> {
VertxContext ctx = VertxContext.current();
for (int i = 0; i < 10; i++) {
ctx.duplicate().execute(executionStyle, () -> {
MyService.currentContexts.add(VertxContext.current().describe());
myService.hello().whenComplete((value, error) -> {
MyService.currentContexts.add(VertxContext.current().describe());
results.add(error == null ? value : error);
});
});
}
});

// 10 immediate calls
await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10);

assertThat(results).haveExactly(10,
new Condition<>("Hello!"::equals, "successful result"));

// 10 immediate calls: 4 identical items for each
assertThat(MyService.currentContexts).hasSize(40);
assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle);
assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext);
assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.smallrye.faulttolerance.vertx.async;

import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentLinkedQueue;

import jakarta.enterprise.context.ApplicationScoped;

import io.smallrye.faulttolerance.api.AsynchronousNonBlocking;
import io.smallrye.faulttolerance.vertx.ContextDescription;
import io.smallrye.faulttolerance.vertx.ExecutionStyle;
import io.smallrye.faulttolerance.vertx.VertxContext;
import io.vertx.core.Context;

@ApplicationScoped
public class MyService {
static final Queue<ContextDescription> currentContexts = new ConcurrentLinkedQueue<>();

@AsynchronousNonBlocking
public CompletionStage<String> hello() {
currentContexts.add(VertxContext.current().describe());

ExecutionStyle executionStyle;
if (Context.isOnEventLoopThread()) {
executionStyle = ExecutionStyle.WORKER;
} else if (Context.isOnWorkerThread()) {
executionStyle = ExecutionStyle.EVENT_LOOP;
} else {
throw new UnsupportedOperationException();
}

CompletableFuture<String> result = new CompletableFuture<>();
VertxContext.current().setTimer(1000, () -> {
currentContexts.add(VertxContext.current().describe());

VertxContext.current().execute(executionStyle, () -> {
result.complete("Hello!");
});
});
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;

Expand All @@ -12,39 +14,55 @@
import org.junit.jupiter.api.Test;

import io.smallrye.faulttolerance.vertx.AbstractVertxTest;
import io.smallrye.faulttolerance.vertx.ContextDescription;
import io.smallrye.faulttolerance.vertx.ExecutionStyle;
import io.smallrye.faulttolerance.vertx.VertxContext;

public class AsyncBulkheadOnVertxThreadTest extends AbstractVertxTest {
@BeforeEach
public void setUp() {
MyService.invocationThreads.clear();
MyService.currentContexts.clear();
}

@Test
public void nonblockingBulkhead(MyService myService) {
CopyOnWriteArrayList<Object> results = new CopyOnWriteArrayList<>();
public void eventLoop(MyService myService) {
test(myService, ExecutionStyle.EVENT_LOOP);
}

@Test
public void worker(MyService myService) {
test(myService, ExecutionStyle.WORKER);
}

private void test(MyService myService, ExecutionStyle executionStyle) {
List<Object> results = new CopyOnWriteArrayList<>();

runOnVertx(() -> {
VertxContext ctx = VertxContext.current();
for (int i = 0; i < 10; i++) {
myService.hello().whenComplete((value, error) -> {
results.add(error == null ? value : error);
ctx.duplicate().execute(executionStyle, () -> {
MyService.currentContexts.add(VertxContext.current().describe());
myService.hello().whenComplete((value, error) -> {
MyService.currentContexts.add(VertxContext.current().describe());
results.add(error == null ? value : error);
});
});
}
});

// 3 immediate invocations + 3 queued invocations + 4 rejected from bulkhead
// 3 immediate calls + 3 queued calls + 4 rejected from bulkhead
await().atMost(5, TimeUnit.SECONDS).until(() -> results.size() == 10);

assertThat(results).haveExactly(6,
new Condition<>("Hello!"::equals, "successful result"));
assertThat(results).haveExactly(4,
new Condition<>(it -> it instanceof BulkheadException, "failed result"));

// 3 immediate invocations + 3 queued invocations
// 2 identical items for each invocation
assertThat(MyService.invocationThreads).hasSize(12);
assertThat(MyService.invocationThreads).allSatisfy(thread -> {
assertThat(thread).contains("vert.x-eventloop");
});
assertThat(MyService.invocationThreads).containsOnly(MyService.invocationThreads.peek());
// 3 immediate calls + 3 queued calls: 4 identical items for each
// 4 rejected calls: 2 identical items for each
assertThat(MyService.currentContexts).hasSize(32);
assertThat(MyService.currentContexts).allMatch(it -> executionStyle == it.executionStyle);
assertThat(MyService.currentContexts).allMatch(ContextDescription::isDuplicatedContext);
assertThat(new HashSet<>(MyService.currentContexts)).hasSize(10);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,22 @@

import org.eclipse.microprofile.faulttolerance.Bulkhead;

import io.smallrye.common.annotation.NonBlocking;
import io.vertx.core.Vertx;
import io.smallrye.faulttolerance.api.AsynchronousNonBlocking;
import io.smallrye.faulttolerance.vertx.ContextDescription;
import io.smallrye.faulttolerance.vertx.VertxContext;

@ApplicationScoped
public class MyService {
static final Queue<String> invocationThreads = new ConcurrentLinkedQueue<>();
static final Queue<ContextDescription> currentContexts = new ConcurrentLinkedQueue<>();

@NonBlocking
@AsynchronousNonBlocking
@Bulkhead(value = 3, waitingTaskQueue = 3)
public CompletionStage<String> hello() {
invocationThreads.add(Thread.currentThread().getName());
currentContexts.add(VertxContext.current().describe());

CompletableFuture<String> result = new CompletableFuture<>();
Vertx.currentContext().owner().setTimer(1000, ignored -> {
invocationThreads.add(Thread.currentThread().getName());
VertxContext.current().setTimer(1000, () -> {
currentContexts.add(VertxContext.current().describe());

result.complete("Hello!");
});
Expand Down
Loading

0 comments on commit eb83fe2

Please sign in to comment.