diff --git a/src/main/java/net/jodah/failsafe/AbstractExecution.java b/src/main/java/net/jodah/failsafe/AbstractExecution.java index 37ff03e6..0aa62327 100644 --- a/src/main/java/net/jodah/failsafe/AbstractExecution.java +++ b/src/main/java/net/jodah/failsafe/AbstractExecution.java @@ -34,11 +34,15 @@ public abstract class AbstractExecution extends ExecutionContext { final FailsafeExecutor executor; final List>> policyExecutors; + enum Status { + NOT_RUNNING, RUNNING, TIMED_OUT + } + // Internally mutable state + /* The status of an execution */ + volatile Status status = Status.NOT_RUNNING; /* Whether the execution attempt has been recorded */ volatile boolean attemptRecorded; - /* Whether the execution result has been recorded */ - volatile boolean executionRecorded; /* Whether a result has been post-executed */ volatile boolean resultHandled; /* Whether the execution can be interrupted */ @@ -73,15 +77,19 @@ public abstract class AbstractExecution extends ExecutionContext { * @throws IllegalStateException if the execution is already complete */ void record(ExecutionResult result) { + record(result, false); + } + + void record(ExecutionResult result, boolean timeout) { Assert.state(!completed, "Execution has already been completed"); if (!interrupted) { recordAttempt(); - if (!executionRecorded) { + if (Status.RUNNING.equals(status)) { + lastResult = result.getResult(); + lastFailure = result.getFailure(); executions.incrementAndGet(); - executionRecorded = true; + status = timeout ? Status.TIMED_OUT : Status.NOT_RUNNING; } - lastResult = result.getResult(); - lastFailure = result.getFailure(); } } @@ -96,12 +104,12 @@ void recordAttempt() { } } - void preExecute() { + synchronized void preExecute() { attemptStartTime = Duration.ofNanos(System.nanoTime()); if (startTime == Duration.ZERO) startTime = attemptStartTime; + status = Status.RUNNING; attemptRecorded = false; - executionRecorded = false; resultHandled = false; cancelledIndex = 0; canInterrupt = true; diff --git a/src/main/java/net/jodah/failsafe/AsyncExecution.java b/src/main/java/net/jodah/failsafe/AsyncExecution.java index 596b153e..5c39ba27 100644 --- a/src/main/java/net/jodah/failsafe/AsyncExecution.java +++ b/src/main/java/net/jodah/failsafe/AsyncExecution.java @@ -21,6 +21,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -46,7 +47,7 @@ AsyncExecution(Scheduler scheduler, FailsafeFuture future, FailsafeExecut void inject(Supplier> syncSupplier, boolean asyncExecution) { if (!asyncExecution) { - outerExecutionSupplier = Functions.getPromiseAsync(syncSupplier, scheduler, future); + outerExecutionSupplier = Functions.getPromiseAsync(syncSupplier, scheduler, this); } else { outerExecutionSupplier = innerExecutionSupplier = Functions.toSettableSupplier(syncSupplier); } @@ -181,8 +182,10 @@ ExecutionResult postExecute(ExecutionResult result) { void executeAsync(boolean asyncExecution) { if (!asyncExecution) outerExecutionSupplier.get().whenComplete(this::complete); - else - future.injectPolicy(scheduler.schedule(innerExecutionSupplier::get, 0, TimeUnit.NANOSECONDS)); + else { + Future scheduledSupply = scheduler.schedule(innerExecutionSupplier::get, 0, TimeUnit.NANOSECONDS); + future.injectCancelFn((mayInterrupt, result) -> scheduledSupply.cancel(mayInterrupt)); + } } /** diff --git a/src/main/java/net/jodah/failsafe/FailsafeExecutor.java b/src/main/java/net/jodah/failsafe/FailsafeExecutor.java index 216a757c..5c90dac7 100644 --- a/src/main/java/net/jodah/failsafe/FailsafeExecutor.java +++ b/src/main/java/net/jodah/failsafe/FailsafeExecutor.java @@ -31,7 +31,8 @@ /** *

* An executor that handles failures according to configured {@link FailurePolicy policies}. Can be created via {@link - * Failsafe#with(Policy[])}. + * Failsafe#with(Policy, Policy[])} to support policy based execution failure handling, or {@link Failsafe#none()} to + * support execution with no failure handling. *

* Async executions are run by default on the {@link ForkJoinPool#commonPool()}. Alternative executors can be configured * via {@link #with(ScheduledExecutorService)} and similar methods. All async executions are cancellable and @@ -402,7 +403,7 @@ private T call(Function> supplierFn) { * @throws NullPointerException if the {@code supplierFn} is null * @throws RejectedExecutionException if the {@code supplierFn} cannot be scheduled for execution */ - @SuppressWarnings("unchecked") + @SuppressWarnings({ "unchecked", "rawtypes" }) private CompletableFuture callAsync( Function>> supplierFn, boolean asyncExecution) { FailsafeFuture future = new FailsafeFuture(this); diff --git a/src/main/java/net/jodah/failsafe/FailsafeFuture.java b/src/main/java/net/jodah/failsafe/FailsafeFuture.java index 4263b89c..8846865d 100644 --- a/src/main/java/net/jodah/failsafe/FailsafeFuture.java +++ b/src/main/java/net/jodah/failsafe/FailsafeFuture.java @@ -20,6 +20,7 @@ import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import java.util.function.BiConsumer; /** * A CompletableFuture implementation that propagates cancellations and calls completion handlers. @@ -34,9 +35,8 @@ public class FailsafeFuture extends CompletableFuture { private AbstractExecution execution; // Mutable state, guarded by "this" - private Future policyExecFuture; private Future dependentStageFuture; - private Runnable cancelFn; + private BiConsumer cancelFn; private List> timeoutFutures; private boolean cancelWithInterrupt; @@ -72,7 +72,7 @@ public synchronized boolean cancel(boolean mayInterruptIfRunning) { this.cancelWithInterrupt = mayInterruptIfRunning; execution.cancelledIndex = Integer.MAX_VALUE; boolean cancelResult = super.cancel(mayInterruptIfRunning); - cancelResult = cancelDependencies(mayInterruptIfRunning, cancelResult); + cancelDependencies(mayInterruptIfRunning, null); ExecutionResult result = ExecutionResult.failure(new CancellationException()); super.completeExceptionally(result.getFailure()); executor.handleComplete(result, execution); @@ -98,46 +98,31 @@ synchronized boolean completeResult(ExecutionResult result) { return completed; } - synchronized Future getDependency() { - return policyExecFuture; - } - synchronized List> getTimeoutDelegates() { return timeoutFutures; } /** - * Cancels the dependency passing in the {@code interruptDelegate} flag, applies the retry cancel fn, and cancels all + * Cancels the dependency passing in the {@code mayInterrupt} flag, applies the retry cancel fn, and cancels all * timeout dependencies. */ - synchronized boolean cancelDependencies(boolean interruptDelegate, boolean result) { - execution.interrupted = interruptDelegate; - if (policyExecFuture != null) - result = policyExecFuture.cancel(interruptDelegate); + synchronized void cancelDependencies(boolean mayInterrupt, ExecutionResult cancelResult) { + execution.interrupted = mayInterrupt; if (dependentStageFuture != null) - dependentStageFuture.cancel(interruptDelegate); - if (cancelFn != null) - cancelFn.run(); + dependentStageFuture.cancel(mayInterrupt); if (timeoutFutures != null) { for (Future timeoutDelegate : timeoutFutures) timeoutDelegate.cancel(false); timeoutFutures.clear(); } - return result; + if (cancelFn != null) + cancelFn.accept(mayInterrupt, cancelResult); } void inject(AbstractExecution execution) { this.execution = execution; } - /** - * Injects a {@code policyExecFuture} to be cancelled when this future is cancelled. - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - synchronized void injectPolicy(Future policyExecFuture) { - this.policyExecFuture = (Future) policyExecFuture; - } - /** * Injects a {@code dependentStageFuture} to be cancelled when this future is cancelled. */ @@ -152,7 +137,7 @@ synchronized void injectStage(Future dependentStageFuture) { /** * Injects a {@code cancelFn} to be called when this future is cancelled. */ - synchronized void injectCancelFn(Runnable cancelFn) { + synchronized void injectCancelFn(BiConsumer cancelFn) { this.cancelFn = cancelFn; } diff --git a/src/main/java/net/jodah/failsafe/FallbackExecutor.java b/src/main/java/net/jodah/failsafe/FallbackExecutor.java index f13f4c53..7ee309f8 100644 --- a/src/main/java/net/jodah/failsafe/FallbackExecutor.java +++ b/src/main/java/net/jodah/failsafe/FallbackExecutor.java @@ -96,12 +96,11 @@ protected Supplier> supplyAsync( else { Future scheduledFallback = scheduler.schedule(callable, 0, TimeUnit.NANOSECONDS); - // Propagate cancellation to the scheduled retry and promise - future.injectCancelFn(() -> { - System.out.println("cancelling scheduled fallback isdone: " + scheduledFallback.isDone()); - scheduledFallback.cancel(false); + // Propagate cancellation to the scheduled fallback and its promise + future.injectCancelFn((mayInterrupt, promiseResult) -> { + scheduledFallback.cancel(mayInterrupt); if (executionCancelled()) - promise.complete(null); + promise.complete(promiseResult); }); } } catch (Throwable t) { diff --git a/src/main/java/net/jodah/failsafe/Functions.java b/src/main/java/net/jodah/failsafe/Functions.java index 6a433266..3903be47 100644 --- a/src/main/java/net/jodah/failsafe/Functions.java +++ b/src/main/java/net/jodah/failsafe/Functions.java @@ -15,6 +15,7 @@ */ package net.jodah.failsafe; +import net.jodah.failsafe.AbstractExecution.Status; import net.jodah.failsafe.function.*; import net.jodah.failsafe.internal.util.Assert; import net.jodah.failsafe.util.concurrent.Scheduler; @@ -90,7 +91,8 @@ static Supplier> getPromise(ContextualSup * calls, and returns a promise containing the result. */ static Supplier> getPromiseAsync( - Supplier> supplier, Scheduler scheduler, FailsafeFuture future) { + Supplier> supplier, Scheduler scheduler, AsyncExecution execution) { + AtomicBoolean scheduled = new AtomicBoolean(); return () -> { if (scheduled.get()) { @@ -106,7 +108,16 @@ static Supplier> getPromiseAsync( try { scheduled.set(true); - future.injectPolicy(scheduler.schedule(callable, 0, TimeUnit.NANOSECONDS)); + Future scheduledSupply = scheduler.schedule(callable, 0, TimeUnit.NANOSECONDS); + + // Propagate cancellation to the scheduled supplier and its promise + execution.future.injectCancelFn((mayInterrupt, cancelResult) -> { + scheduledSupply.cancel(mayInterrupt); + + // Cancel a pending promise if the execution is not yet running + if (Status.NOT_RUNNING.equals(execution.status)) + promise.complete(cancelResult); + }); } catch (Throwable t) { promise.completeExceptionally(t); } diff --git a/src/main/java/net/jodah/failsafe/PolicyExecutor.java b/src/main/java/net/jodah/failsafe/PolicyExecutor.java index aba31d82..5e0b88c9 100644 --- a/src/main/java/net/jodah/failsafe/PolicyExecutor.java +++ b/src/main/java/net/jodah/failsafe/PolicyExecutor.java @@ -31,7 +31,8 @@ public abstract class PolicyExecutor

{ protected final P policy; protected final AbstractExecution execution; - /* Index of the policy relative to other policies in a composition, inner-most first */ int policyIndex; + // Index of the policy relative to other policies in a composition, inner-most first + int policyIndex; protected PolicyExecutor(P policy, AbstractExecution execution) { this.policy = policy; diff --git a/src/main/java/net/jodah/failsafe/RetryPolicyExecutor.java b/src/main/java/net/jodah/failsafe/RetryPolicyExecutor.java index 01d93762..2cb45b27 100644 --- a/src/main/java/net/jodah/failsafe/RetryPolicyExecutor.java +++ b/src/main/java/net/jodah/failsafe/RetryPolicyExecutor.java @@ -21,6 +21,7 @@ import java.time.Duration; import java.util.concurrent.Callable; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -114,14 +115,18 @@ public Object call() { supplier.get().whenComplete((result, error) -> { if (error != null) promise.completeExceptionally(error); - else if (result != null) { + else if (result == null) + promise.complete(null); + else { if (retriesExceeded || executionCancelled()) { promise.complete(result); } else { postExecuteAsync(result, scheduler, future).whenComplete((postResult, postError) -> { if (postError != null) promise.completeExceptionally(postError); - else if (postResult != null) { + else if (postResult == null) + promise.complete(null); + else { if (postResult.isComplete() || executionCancelled()) { promise.complete(postResult); } else { @@ -133,11 +138,14 @@ else if (postResult != null) { retryScheduledListener.handle(postResult, execution); previousResult = postResult; - future.injectPolicy(scheduler.schedule(this, postResult.getWaitNanos(), TimeUnit.NANOSECONDS)); - future.injectCancelFn(() -> { - // Ensure that the promise completes if a scheduled retry is cancelled + Future scheduledRetry = scheduler.schedule(this, postResult.getWaitNanos(), + TimeUnit.NANOSECONDS); + + // Propagate cancellation to the scheduled retry and its promise + future.injectCancelFn((mayInterrupt, cancelResult) -> { + scheduledRetry.cancel(mayInterrupt); if (executionCancelled()) - promise.complete(null); + promise.complete(cancelResult); }); } catch (Throwable t) { // Hard scheduling failure diff --git a/src/main/java/net/jodah/failsafe/TimeoutExecutor.java b/src/main/java/net/jodah/failsafe/TimeoutExecutor.java index e0b5fb73..c32737c7 100644 --- a/src/main/java/net/jodah/failsafe/TimeoutExecutor.java +++ b/src/main/java/net/jodah/failsafe/TimeoutExecutor.java @@ -73,7 +73,7 @@ protected Supplier supply(Supplier supplier, S // Guard against race with the execution completing synchronized (execution) { if (execution.canInterrupt) { - execution.record(result.get()); + execution.record(result.get(), true); execution.interrupted = true; executionThread.interrupt(); } @@ -115,17 +115,17 @@ protected Supplier> supplyAsync( try { // Schedule timeout check timeoutFuture.set(Scheduler.DEFAULT.schedule(() -> { - // Guard against race with execution completion - if (executionResult.compareAndSet(null, - ExecutionResult.failure(new TimeoutExceededException(policy)))) { + ExecutionResult cancelResult = ExecutionResult.failure(new TimeoutExceededException(policy)); + // Guard against race with execution completion + if (executionResult.compareAndSet(null, cancelResult)) { boolean canInterrupt = policy.canInterrupt(); if (canInterrupt) - execution.record(executionResult.get()); + execution.record(executionResult.get(), true); // Cancel and interrupt execution.cancelledIndex = policyIndex; - future.cancelDependencies(canInterrupt, false); + future.cancelDependencies(canInterrupt, cancelResult); } return null; }, policy.getTimeout().toNanos(), TimeUnit.NANOSECONDS)); diff --git a/src/test/java/net/jodah/failsafe/AbstractFailsafeTest.java b/src/test/java/net/jodah/failsafe/AbstractFailsafeTest.java index 8577804b..065de051 100644 --- a/src/test/java/net/jodah/failsafe/AbstractFailsafeTest.java +++ b/src/test/java/net/jodah/failsafe/AbstractFailsafeTest.java @@ -347,7 +347,7 @@ public void shouldTimeout() throws Throwable { }; // When / Then - FailsafeExecutor failsafe = Failsafe.with(rp, timeout).onSuccess(e -> { + FailsafeExecutor failsafe = Failsafe.with(rp, timeout).onComplete(e -> { waiter.assertEquals(e.getAttemptCount(), 3); waiter.assertEquals(e.getExecutionCount(), 3); waiter.assertEquals("foo2", e.getResult()); diff --git a/src/test/java/net/jodah/failsafe/AsyncExecutionTest.java b/src/test/java/net/jodah/failsafe/AsyncExecutionTest.java index ef016f1c..0404b2da 100644 --- a/src/test/java/net/jodah/failsafe/AsyncExecutionTest.java +++ b/src/test/java/net/jodah/failsafe/AsyncExecutionTest.java @@ -52,6 +52,7 @@ public void testCompleteForNoResult() { exec = new AsyncExecution(scheduler, future, executorFor(new RetryPolicy<>())); // When + exec.preExecute(); exec.complete(); // Then @@ -68,6 +69,7 @@ public void testCompleteForResult() { exec = new AsyncExecution(scheduler, future, executorFor(new RetryPolicy<>().handleResult(null))); // When / Then + exec.preExecute(); assertFalse(exec.complete(null)); exec.preExecute(); assertTrue(exec.complete(true)); @@ -87,6 +89,7 @@ public void testGetAttemptCount() { exec.inject(Functions.getPromise(ctx -> null, exec), true); // When + exec.preExecute(); exec.retryOn(e); exec.preExecute(); exec.retryOn(e); @@ -102,6 +105,7 @@ public void testRetryForResult() { exec.inject(Functions.getPromise(ctx -> null, exec), true); // When / Then + exec.preExecute(); assertFalse(exec.complete(null)); exec.preExecute(); assertTrue(exec.retryFor(null)); @@ -123,6 +127,7 @@ public void testRetryForResult() { // When / Then resetMocks(); + exec.preExecute(); assertFalse(exec.complete(null)); exec.preExecute(); assertTrue(exec.retryFor(null)); @@ -146,6 +151,7 @@ public void testRetryForResultAndThrowable() { exec.inject(Functions.getPromise(ctx -> null, exec), true); // When / Then + exec.preExecute(); assertFalse(exec.complete(null)); exec.preExecute(); assertTrue(exec.retryFor(null, null)); @@ -169,6 +175,7 @@ public void testRetryForResultAndThrowable() { // When / Then resetMocks(); + exec.preExecute(); assertFalse(exec.complete(null)); exec.preExecute(); assertTrue(exec.retryFor(null, e)); @@ -192,6 +199,7 @@ public void testRetryOn() { exec.inject(Functions.getPromise(ctx -> null, exec), true); // When / Then + exec.preExecute(); assertTrue(exec.retryOn(new IllegalArgumentException())); exec.preExecute(); assertFalse(exec.retryOn(e)); @@ -211,6 +219,7 @@ public void testRetryOn() { // When / Then resetMocks(); + exec.preExecute(); assertTrue(exec.retryOn(e)); exec.preExecute(); assertFalse(exec.retryOn(e)); @@ -239,6 +248,7 @@ public void testCompleteOrRetry() { exec.inject(Functions.getPromise(ctx -> null, exec), true); // When / Then + exec.preExecute(); exec.completeOrHandle(null, e); assertFalse(exec.isComplete()); exec.preExecute(); diff --git a/src/test/java/net/jodah/failsafe/AsyncFailsafeTest.java b/src/test/java/net/jodah/failsafe/AsyncFailsafeTest.java index 94ff3d49..ee2bc51b 100644 --- a/src/test/java/net/jodah/failsafe/AsyncFailsafeTest.java +++ b/src/test/java/net/jodah/failsafe/AsyncFailsafeTest.java @@ -28,7 +28,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Function; import static net.jodah.failsafe.Asserts.assertThrows; @@ -304,7 +303,7 @@ public void shouldCompleteFutureExternally() throws Throwable { */ public void shouldCancelNestedTimeoutsWithInterrupt() throws Throwable { // Given - RetryPolicy rp = new RetryPolicy().withMaxRetries(2); + RetryPolicy rp = new RetryPolicy().onRetry(e -> System.out.println("Retrying")); Timeout timeout1 = Timeout.of(Duration.ofMillis(1000)); Timeout timeout2 = Timeout.of(Duration.ofMillis(200)).withInterrupt(true); AtomicReference> futureRef = new AtomicReference<>(); @@ -319,20 +318,16 @@ public void shouldCancelNestedTimeoutsWithInterrupt() throws Throwable { // Wait for futureRef to be set futureLatch.await(); waiter.assertTrue(ctx.getLastFailure() == null || ctx.getLastFailure() instanceof TimeoutExceededException); - Consumer asserts = (expected) -> { - waiter.assertEquals(expected, ctx.isCancelled()); - waiter.assertEquals(expected, futureRef.get().getDependency().isCancelled()); - if (!futureRef.get().getTimeoutDelegates().isEmpty()) - waiter.assertEquals(expected, futureRef.get().getTimeoutDelegates().stream().allMatch(Future::isCancelled)); - }; try { // Assert not cancelled - asserts.accept(false); + waiter.assertFalse(ctx.isCancelled()); + if (!futureRef.get().getTimeoutDelegates().isEmpty()) + waiter.assertFalse(futureRef.get().getTimeoutDelegates().stream().allMatch(Future::isCancelled)); Thread.sleep(1000); } catch (InterruptedException e) { // Assert cancelled - asserts.accept(true); + waiter.assertTrue(ctx.isCancelled()); waiter.resume(); throw e; } @@ -367,7 +362,6 @@ private void assertCancel(Function, Future> executorCalla // Then assertTrue(future.isCancelled()); - assertTrue(future.getDependency().isCancelled()); assertTrue( future.getTimeoutDelegates() == null || future.getTimeoutDelegates().stream().allMatch(Future::isCancelled)); assertTrue(future.isDone()); diff --git a/src/test/java/net/jodah/failsafe/Testing.java b/src/test/java/net/jodah/failsafe/Testing.java index f6e11463..32fa7c8e 100644 --- a/src/test/java/net/jodah/failsafe/Testing.java +++ b/src/test/java/net/jodah/failsafe/Testing.java @@ -38,6 +38,13 @@ public class Testing { public static class ConnectException extends RuntimeException { } + + public interface Service { + boolean connect(); + + boolean disconnect(); + } + public static class SyncExecutor implements Executor { @Override public void execute(Runnable command) { @@ -76,12 +83,6 @@ public void reset() { } } - public interface Service { - boolean connect(); - - boolean disconnect(); - } - public static Throwable getThrowable(CheckedRunnable runnable) { try { runnable.run(); @@ -279,6 +280,16 @@ public static void sneakyThrow(Throwable e) throws E { throw (E) e; } + public static Runnable uncheck(CheckedRunnable runnable) { + return () -> { + try { + runnable.run(); + } catch (Throwable e) { + throw new RuntimeException(e); + } + }; + } + public static CircuitBreakerInternals getInternals(CircuitBreaker circuitBreaker) { try { Field internalsField = CircuitBreaker.class.getDeclaredField("internals"); diff --git a/src/test/java/net/jodah/failsafe/issues/Issue231Test.java b/src/test/java/net/jodah/failsafe/issues/Issue231Test.java new file mode 100644 index 00000000..f49a7ab9 --- /dev/null +++ b/src/test/java/net/jodah/failsafe/issues/Issue231Test.java @@ -0,0 +1,33 @@ +package net.jodah.failsafe.issues; + +import net.jodah.failsafe.Asserts; +import net.jodah.failsafe.Failsafe; +import net.jodah.failsafe.Timeout; +import net.jodah.failsafe.TimeoutExceededException; +import org.testng.annotations.Test; + +import java.time.Duration; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.testng.Assert.assertTrue; + +@Test +public class Issue231Test { + /** + * Timeout, even with interruption, should wait for the execution to complete. + */ + public void shouldWaitForExecutionCompletion() { + Timeout timeout = Timeout.of(Duration.ofMillis(100)).withInterrupt(true); + AtomicBoolean executionCompleted = new AtomicBoolean(); + Asserts.assertThrows(() -> Failsafe.with(timeout).runAsync(() -> { + try { + Thread.sleep(1000); + } catch (InterruptedException ignore) { + Thread.sleep(200); + executionCompleted.set(true); + } + }).get(), ExecutionException.class, TimeoutExceededException.class); + assertTrue(executionCompleted.get()); + } +} diff --git a/src/test/java/net/jodah/failsafe/issues/Issue260Test.java b/src/test/java/net/jodah/failsafe/issues/Issue260Test.java index ae398ca9..3dc82586 100644 --- a/src/test/java/net/jodah/failsafe/issues/Issue260Test.java +++ b/src/test/java/net/jodah/failsafe/issues/Issue260Test.java @@ -10,6 +10,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.Function; @Test @@ -35,8 +36,8 @@ public void test() throws Throwable { Future f1 = Failsafe.with(rp, timeout).with(executor).runAsync(task.apply(1)); Future f2 = Failsafe.with(rp, timeout).with(executor).runAsync(task.apply(2)); Future f3 = Failsafe.with(rp, timeout).with(executor).runAsync(task.apply(3)); - f1.get(); - f2.get(); - f3.get(); + f1.get(1, TimeUnit.SECONDS); + f2.get(1, TimeUnit.SECONDS); + f3.get(1, TimeUnit.SECONDS); } }