Skip to content

Commit

Permalink
Make config disabled tests use AsyncTaskManager
Browse files Browse the repository at this point in the history
Previously these tests were using a fixed thread pool executor which
will create unmanaged threads on a Jakarta EE server.
  • Loading branch information
Azquelt committed Jun 26, 2024
1 parent 3543ebc commit 3f0c9b7
Show file tree
Hide file tree
Showing 11 changed files with 155 additions and 274 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
package org.eclipse.microprofile.fault.tolerance.tck.disableEnv;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.eclipse.microprofile.fault.tolerance.tck.util.Barrier;
import org.eclipse.microprofile.fault.tolerance.tck.util.ConcurrentExecutionTracker;
import org.eclipse.microprofile.fault.tolerance.tck.util.TCKConfig;
import org.eclipse.microprofile.fault.tolerance.tck.util.TestException;
Expand Down Expand Up @@ -135,12 +135,10 @@ public void failWithTimeout() {
* the future to wait for
*/
@Bulkhead(2)
public void waitWithBulkhead(Future<?> waitingFuture) {
public void waitWithBulkhead(Barrier barrier) {
try {
tracker.executionStarted();
waitingFuture.get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
barrier.await();
} finally {
tracker.executionEnded();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@

import static org.eclipse.microprofile.fault.tolerance.tck.util.TCKConfig.getConfig;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.eclipse.microprofile.fault.tolerance.tck.config.ConfigAnnotationAsset;
import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncCaller;
import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncTaskManager;
import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncTaskManager.BarrierTask;
import org.eclipse.microprofile.fault.tolerance.tck.util.Packages;
import org.eclipse.microprofile.fault.tolerance.tck.util.TestException;
import org.eclipse.microprofile.faulttolerance.Asynchronous;
Expand Down Expand Up @@ -85,7 +85,8 @@ public static WebArchive deploy() {
.disable(DisableAnnotationClient.class, "failWithTimeout", Timeout.class)
.disable(DisableAnnotationClient.class, "asyncWaitThenReturn", Asynchronous.class)
.disable(DisableAnnotationClient.class, "failRetryOnceThenFallback", Fallback.class)
.disable(DisableAnnotationClient.class, "waitWithBulkhead", Bulkhead.class);
.disable(DisableAnnotationClient.class, "waitWithBulkhead", Bulkhead.class)
.enable(AsyncCaller.class, Asynchronous.class); // Needed by AsyncTaskManager

final ConfigAnnotationAsset mpAnnotationConfig = new ConfigAnnotationAsset()
.setValue(DisableAnnotationClient.class, "failWithTimeout", Timeout.class,
Expand Down Expand Up @@ -173,34 +174,22 @@ public void testAsync() throws InterruptedException, ExecutionException {

/**
* Test whether Bulkhead is enabled on {@code waitWithBulkhead()}
*
* @throws InterruptedException
* interrupted
* @throws ExecutionException
* task was aborted
*/
@Test
public void testBulkhead() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
public void testBulkhead() {

// Start two executions at once
CompletableFuture<Void> waitingFuture = new CompletableFuture<>();
Future<?> result1 = executor.submit(() -> disableClient.waitWithBulkhead(waitingFuture));
Future<?> result2 = executor.submit(() -> disableClient.waitWithBulkhead(waitingFuture));

try {
disableClient.waitForBulkheadExecutions(2);
try (AsyncTaskManager taskManager = new AsyncTaskManager()) {
// Start two executions at once
BarrierTask<?> task1 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
BarrierTask<?> task2 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
task1.assertAwaits();
task2.assertAwaits();

// Try to start a third execution. This would throw a BulkheadException if Bulkhead is enabled.
// Bulkhead is disabled on the method so no exception expected
disableClient.waitWithBulkhead(CompletableFuture.completedFuture(null));
} finally {
// Clean up executor and first two executions
executor.shutdown();

waitingFuture.complete(null);
result1.get();
result2.get();
BarrierTask<?> task3 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
task3.openBarrier();
task3.assertSuccess();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
*******************************************************************************/
package org.eclipse.microprofile.fault.tolerance.tck.disableEnv;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncCaller;
import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncTaskManager;
import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncTaskManager.BarrierTask;
import org.eclipse.microprofile.fault.tolerance.tck.util.Packages;
import org.eclipse.microprofile.fault.tolerance.tck.util.TestException;
import org.eclipse.microprofile.faulttolerance.Asynchronous;
Expand Down Expand Up @@ -74,7 +74,8 @@ public static WebArchive deploy() {
.enable(DisableAnnotationClient.class, Timeout.class)
.enable(DisableAnnotationClient.class, Asynchronous.class)
.enable(DisableAnnotationClient.class, Fallback.class)
.enable(DisableAnnotationClient.class, Bulkhead.class);
.enable(DisableAnnotationClient.class, Bulkhead.class)
.enable(AsyncCaller.class, Asynchronous.class); // Needed by AsyncTaskManager

JavaArchive testJar = ShrinkWrap
.create(JavaArchive.class, "ftDisableGlobalEnableClass.jar")
Expand Down Expand Up @@ -155,35 +156,22 @@ public void testAsync() throws InterruptedException, ExecutionException {

/**
* Test whether Bulkhead is enabled on {@code waitWithBulkhead()}
*
* @throws InterruptedException
* interrupted
* @throws ExecutionException
* task was aborted
*/
@Test
public void testBulkhead() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
public void testBulkhead() {

// Start two executions at once
CompletableFuture<Void> waitingFuture = new CompletableFuture<>();
Future<?> result1 = executor.submit(() -> disableClient.waitWithBulkhead(waitingFuture));
Future<?> result2 = executor.submit(() -> disableClient.waitWithBulkhead(waitingFuture));

try {
disableClient.waitForBulkheadExecutions(2);
try (AsyncTaskManager taskManager = new AsyncTaskManager()) {
// Start two executions at once
BarrierTask<?> task1 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
BarrierTask<?> task2 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
task1.assertAwaits();
task2.assertAwaits();

// Try to start a third execution. This would throw a BulkheadException if Bulkhead is enabled.
// Bulkhead is enabled on the class, so expect exception
Assert.assertThrows(BulkheadException.class,
() -> disableClient.waitWithBulkhead(CompletableFuture.completedFuture(null)));
} finally {
// Clean up executor and first two executions
executor.shutdown();

waitingFuture.complete(null);
result1.get();
result2.get();
BarrierTask<?> task3 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
task3.openBarrier();
task3.assertThrows(BulkheadException.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
*******************************************************************************/
package org.eclipse.microprofile.fault.tolerance.tck.disableEnv;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncCaller;
import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncTaskManager;
import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncTaskManager.BarrierTask;
import org.eclipse.microprofile.fault.tolerance.tck.util.Packages;
import org.eclipse.microprofile.fault.tolerance.tck.util.TestException;
import org.eclipse.microprofile.faulttolerance.Asynchronous;
Expand Down Expand Up @@ -74,7 +74,8 @@ public static WebArchive deploy() {
.enable(DisableAnnotationClient.class, "failWithTimeout", Timeout.class)
.enable(DisableAnnotationClient.class, "asyncWaitThenReturn", Asynchronous.class)
.enable(DisableAnnotationClient.class, "failRetryOnceThenFallback", Fallback.class)
.enable(DisableAnnotationClient.class, "waitWithBulkhead", Bulkhead.class);
.enable(DisableAnnotationClient.class, "waitWithBulkhead", Bulkhead.class)
.enable(AsyncCaller.class, Asynchronous.class); // Needed by AsyncTaskManager

JavaArchive testJar = ShrinkWrap
.create(JavaArchive.class, "ftDisableGloballyEnableMethod.jar")
Expand Down Expand Up @@ -157,36 +158,22 @@ public void testAsync() throws InterruptedException, ExecutionException {

/**
* Test whether Bulkhead is enabled on {@code waitWithBulkhead()}
*
* @throws InterruptedException
* interrupted
* @throws ExecutionException
* task was aborted
*
*/
@Test
public void testBulkhead() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);

// Start two executions at once
CompletableFuture<Void> waitingFuture = new CompletableFuture<>();
Future<?> result1 = executor.submit(() -> disableClient.waitWithBulkhead(waitingFuture));
Future<?> result2 = executor.submit(() -> disableClient.waitWithBulkhead(waitingFuture));
public void testBulkhead() {

try {
disableClient.waitForBulkheadExecutions(2);
try (AsyncTaskManager taskManager = new AsyncTaskManager()) {
// Start two executions at once
BarrierTask<?> task1 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
BarrierTask<?> task2 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
task1.assertAwaits();
task2.assertAwaits();

// Try to start a third execution. This would throw a BulkheadException if Bulkhead is enabled.
// Bulkhead is enabled on the method, so expect exception
Assert.assertThrows(BulkheadException.class,
() -> disableClient.waitWithBulkhead(CompletableFuture.completedFuture(null)));
} finally {
// Clean up executor and first two executions
executor.shutdown();

waitingFuture.complete(null);
result1.get();
result2.get();
BarrierTask<?> task3 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
task3.openBarrier();
task3.assertThrows(BulkheadException.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
*******************************************************************************/
package org.eclipse.microprofile.fault.tolerance.tck.disableEnv;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncCaller;
import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncTaskManager;
import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncTaskManager.BarrierTask;
import org.eclipse.microprofile.fault.tolerance.tck.util.Packages;
import org.eclipse.microprofile.fault.tolerance.tck.util.TestException;
import org.eclipse.microprofile.faulttolerance.Asynchronous;
Expand Down Expand Up @@ -69,7 +69,8 @@ public static WebArchive deploy() {
.disable(Timeout.class)
.disable(Asynchronous.class)
.disable(Fallback.class)
.disable(Bulkhead.class);
.disable(Bulkhead.class)
.enable(AsyncCaller.class, Asynchronous.class); // Needed by AsyncTaskManager;

JavaArchive testJar = ShrinkWrap
.create(JavaArchive.class, "ftDisableGlobally.jar")
Expand Down Expand Up @@ -153,34 +154,22 @@ public void testAsync() throws InterruptedException, ExecutionException {

/**
* Test whether Bulkhead is enabled on {@code waitWithBulkhead()}
*
* @throws InterruptedException
* interrupted
* @throws ExecutionException
* task was aborted
*/
@Test
public void testBulkhead() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
public void testBulkhead() {

// Start two executions at once
CompletableFuture<Void> waitingFuture = new CompletableFuture<>();
Future<?> result1 = executor.submit(() -> disableClient.waitWithBulkhead(waitingFuture));
Future<?> result2 = executor.submit(() -> disableClient.waitWithBulkhead(waitingFuture));
try (AsyncTaskManager taskManager = new AsyncTaskManager()) {
// Start two executions at once
BarrierTask<?> task1 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
BarrierTask<?> task2 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
task1.assertAwaits();
task2.assertAwaits();

try {
disableClient.waitForBulkheadExecutions(2);

// Try to start a third execution. This should throw a BulkheadException if Bulkhead is enabled.
// Try to start a third execution. This would throw a BulkheadException if Bulkhead is enabled.
// Bulkhead is globally disabled so expect no exception
disableClient.waitWithBulkhead(CompletableFuture.completedFuture(null));
} finally {
// Clean up executor and first two executions
executor.shutdown();

waitingFuture.complete(null);
result1.get();
result2.get();
BarrierTask<?> task3 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
task3.openBarrier();
task3.assertSuccess();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
*******************************************************************************/
package org.eclipse.microprofile.fault.tolerance.tck.disableEnv;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncCaller;
import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncTaskManager;
import org.eclipse.microprofile.fault.tolerance.tck.util.AsyncTaskManager.BarrierTask;
import org.eclipse.microprofile.fault.tolerance.tck.util.Packages;
import org.eclipse.microprofile.fault.tolerance.tck.util.TestException;
import org.eclipse.microprofile.faulttolerance.Asynchronous;
Expand Down Expand Up @@ -74,7 +74,8 @@ public static WebArchive deploy() {
.enable(DisableAnnotationClient.class, "failWithTimeout", Timeout.class)
.enable(DisableAnnotationClient.class, "asyncWaitThenReturn", Asynchronous.class)
.enable(DisableAnnotationClient.class, "failRetryOnceThenFallback", Fallback.class)
.enable(DisableAnnotationClient.class, "waitWithBulkhead", Bulkhead.class);
.enable(DisableAnnotationClient.class, "waitWithBulkhead", Bulkhead.class)
.enable(AsyncCaller.class, Asynchronous.class); // Needed by AsyncTaskManager;

JavaArchive testJar = ShrinkWrap
.create(JavaArchive.class, "ftDisableClassEnableMethod.jar")
Expand Down Expand Up @@ -156,35 +157,22 @@ public void testAsync() throws InterruptedException, ExecutionException {

/**
* Test whether Bulkhead is enabled on {@code waitWithBulkhead()}
*
* @throws InterruptedException
* interrupted
* @throws ExecutionException
* task was aborted
*/
@Test
public void testBulkhead() throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(10);
public void testBulkhead() {

// Start two executions at once
CompletableFuture<Void> waitingFuture = new CompletableFuture<>();
Future<?> result1 = executor.submit(() -> disableClient.waitWithBulkhead(waitingFuture));
Future<?> result2 = executor.submit(() -> disableClient.waitWithBulkhead(waitingFuture));

try {
disableClient.waitForBulkheadExecutions(2);
try (AsyncTaskManager taskManager = new AsyncTaskManager()) {
// Start two executions at once
BarrierTask<?> task1 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
BarrierTask<?> task2 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
task1.assertAwaits();
task2.assertAwaits();

// Try to start a third execution. This would throw a BulkheadException if Bulkhead is enabled.
// Bulkhead is enabled on the method, so expect exception
Assert.assertThrows(BulkheadException.class,
() -> disableClient.waitWithBulkhead(CompletableFuture.completedFuture(null)));
} finally {
// Clean up executor and first two executions
executor.shutdown();

waitingFuture.complete(null);
result1.get();
result2.get();
BarrierTask<?> task3 = taskManager.runBarrierTask(disableClient::waitWithBulkhead);
task3.openBarrier();
task3.assertThrows(BulkheadException.class);
}
}
}
Loading

0 comments on commit 3f0c9b7

Please sign in to comment.