Skip to content

Commit

Permalink
[FLINK-34227][runtime] Fixes IO thread leaking into owner of the sche…
Browse files Browse the repository at this point in the history
…duler instance (i.e. the JobMaster)
  • Loading branch information
XComp committed Jan 31, 2025
1 parent bca9e8d commit 67e2dc8
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -587,13 +587,41 @@ public static CompletableFuture<Void> runAfterwardsAsync(
*/
public static CompletableFuture<Void> composeAfterwards(
CompletableFuture<?> future, Supplier<CompletableFuture<?>> composedAction) {
return composeAfterwardsInternal(future, composedAction, CompletableFuture::whenComplete);
}

/**
* Run the given asynchronous action after the completion of the given future. The given future
* can be completed normally or exceptionally. In case of an exceptional completion, the
* asynchronous action's exception will be added to the initial exception.
*
* @param future to wait for its completion
* @param composedAction asynchronous action which is triggered after the future's completion
* @return Future which is completed on the passed {@link Executor} after the asynchronous
* action has completed. This future can contain an exception if an error occurred in the
* given future or asynchronous action.
*/
public static CompletableFuture<Void> composeAfterwardsAsync(
CompletableFuture<?> future,
Supplier<CompletableFuture<?>> composedAction,
Executor executor) {
return composeAfterwardsInternal(
future,
composedAction,
(composedActionFuture, resultFutureCompletion) ->
composedActionFuture.whenCompleteAsync(resultFutureCompletion, executor));
}

private static CompletableFuture<Void> composeAfterwardsInternal(
CompletableFuture<?> future,
Supplier<CompletableFuture<?>> composedAction,
BiConsumer<CompletableFuture<?>, BiConsumer<Object, Throwable>> forwardAction) {
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();

future.whenComplete(
(Object outerIgnored, Throwable outerThrowable) -> {
final CompletableFuture<?> composedActionFuture = composedAction.get();

composedActionFuture.whenComplete(
forwardAction.accept(
composedAction.get(),
(Object innerIgnored, Throwable innerThrowable) -> {
if (innerThrowable != null) {
resultFuture.completeExceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,12 +692,13 @@ public CompletableFuture<Void> closeAsync() {
final FlinkException cause = new FlinkException("Scheduler is being stopped.");

final CompletableFuture<Void> checkpointServicesShutdownFuture =
FutureUtils.composeAfterwards(
FutureUtils.composeAfterwardsAsync(
executionGraph
.getTerminationFuture()
.thenAcceptAsync(
this::shutDownCheckpointServices, getMainThreadExecutor()),
checkpointsCleaner::closeAsync);
checkpointsCleaner::closeAsync,
getMainThreadExecutor());

FutureUtils.assertNoException(checkpointServicesShutdownFuture);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,12 +724,15 @@ public CompletableFuture<Void> closeAsync() {

backgroundTask.abort();
// wait for the background task to finish and then close services
return FutureUtils.composeAfterwards(
return FutureUtils.composeAfterwardsAsync(
FutureUtils.runAfterwardsAsync(
backgroundTask.getTerminationFuture(),
() -> stopCheckpointServicesSafely(jobTerminationFuture.get()),
getMainThreadExecutor()),
checkpointsCleaner::closeAsync);
// closing the CheckpointsCleaner can complete in the ioExecutor when cleaning up a
// PendingCheckpoint
checkpointsCleaner::closeAsync,
getMainThreadExecutor());
}

private void stopCheckpointServicesSafely(JobStatus terminalState) {
Expand Down

0 comments on commit 67e2dc8

Please sign in to comment.