diff --git a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java index ce1fcc6f1928d..519f76d1ef129 100644 --- a/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/concurrent/FutureUtils.java @@ -587,13 +587,41 @@ public static CompletableFuture runAfterwardsAsync( */ public static CompletableFuture composeAfterwards( CompletableFuture future, Supplier> composedAction) { + return composeAfterwardsInternal(future, composedAction, CompletableFuture::whenComplete); + } + + /** + * Run the given asynchronous action after the completion of the given future. The given future + * can be completed normally or exceptionally. In case of an exceptional completion, the + * asynchronous action's exception will be added to the initial exception. + * + * @param future to wait for its completion + * @param composedAction asynchronous action which is triggered after the future's completion + * @return Future which is completed on the passed {@link Executor} after the asynchronous + * action has completed. This future can contain an exception if an error occurred in the + * given future or asynchronous action. + */ + public static CompletableFuture composeAfterwardsAsync( + CompletableFuture future, + Supplier> composedAction, + Executor executor) { + return composeAfterwardsInternal( + future, + composedAction, + (composedActionFuture, resultFutureCompletion) -> + composedActionFuture.whenCompleteAsync(resultFutureCompletion, executor)); + } + + private static CompletableFuture composeAfterwardsInternal( + CompletableFuture future, + Supplier> composedAction, + BiConsumer, BiConsumer> forwardAction) { final CompletableFuture resultFuture = new CompletableFuture<>(); future.whenComplete( (Object outerIgnored, Throwable outerThrowable) -> { - final CompletableFuture composedActionFuture = composedAction.get(); - - composedActionFuture.whenComplete( + forwardAction.accept( + composedAction.get(), (Object innerIgnored, Throwable innerThrowable) -> { if (innerThrowable != null) { resultFuture.completeExceptionally( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java index 80aab984bfd19..6c8914f65a1cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java @@ -692,12 +692,13 @@ public CompletableFuture closeAsync() { final FlinkException cause = new FlinkException("Scheduler is being stopped."); final CompletableFuture checkpointServicesShutdownFuture = - FutureUtils.composeAfterwards( + FutureUtils.composeAfterwardsAsync( executionGraph .getTerminationFuture() .thenAcceptAsync( this::shutDownCheckpointServices, getMainThreadExecutor()), - checkpointsCleaner::closeAsync); + checkpointsCleaner::closeAsync, + getMainThreadExecutor()); FutureUtils.assertNoException(checkpointServicesShutdownFuture); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java index 31505241fc13b..dec906014573e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java @@ -724,12 +724,15 @@ public CompletableFuture closeAsync() { backgroundTask.abort(); // wait for the background task to finish and then close services - return FutureUtils.composeAfterwards( + return FutureUtils.composeAfterwardsAsync( FutureUtils.runAfterwardsAsync( backgroundTask.getTerminationFuture(), () -> stopCheckpointServicesSafely(jobTerminationFuture.get()), getMainThreadExecutor()), - checkpointsCleaner::closeAsync); + // closing the CheckpointsCleaner can complete in the ioExecutor when cleaning up a + // PendingCheckpoint + checkpointsCleaner::closeAsync, + getMainThreadExecutor()); } private void stopCheckpointServicesSafely(JobStatus terminalState) {