Skip to content

Commit

Permalink
[FLINK-34227][runtime] Fixes IO thread leaking into owner of the sche…
Browse files Browse the repository at this point in the history
…duler instance (i.e. the JobMaster)

- Adds test for checking whether the scheduler closing leaks an IO thread via the CheckpointsCleaner to the *SchedulerTests
- Makes CheckpointsCleaner available in AdaptiveBatchSchedulerFactory.createScheduler
  • Loading branch information
XComp committed Feb 4, 2025
1 parent 750d96c commit 5adcab9
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -587,13 +587,41 @@ public static CompletableFuture<Void> runAfterwardsAsync(
*/
public static CompletableFuture<Void> composeAfterwards(
CompletableFuture<?> future, Supplier<CompletableFuture<?>> composedAction) {
return composeAfterwardsInternal(future, composedAction, CompletableFuture::whenComplete);
}

/**
* Run the given asynchronous action after the completion of the given future. The given future
* can be completed normally or exceptionally. In case of an exceptional completion, the
* asynchronous action's exception will be added to the initial exception.
*
* @param future to wait for its completion
* @param composedAction asynchronous action which is triggered after the future's completion
* @return Future which is completed on the passed {@link Executor} after the asynchronous
* action has completed. This future can contain an exception if an error occurred in the
* given future or asynchronous action.
*/
public static CompletableFuture<Void> composeAfterwardsAsync(
CompletableFuture<?> future,
Supplier<CompletableFuture<?>> composedAction,
Executor executor) {
return composeAfterwardsInternal(
future,
composedAction,
(composedActionFuture, resultFutureCompletion) ->
composedActionFuture.whenCompleteAsync(resultFutureCompletion, executor));
}

private static CompletableFuture<Void> composeAfterwardsInternal(
CompletableFuture<?> future,
Supplier<CompletableFuture<?>> composedAction,
BiConsumer<CompletableFuture<?>, BiConsumer<Object, Throwable>> forwardAction) {
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();

future.whenComplete(
(Object outerIgnored, Throwable outerThrowable) -> {
final CompletableFuture<?> composedActionFuture = composedAction.get();

composedActionFuture.whenComplete(
forwardAction.accept(
composedAction.get(),
(Object innerIgnored, Throwable innerThrowable) -> {
if (innerThrowable != null) {
resultFuture.completeExceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,12 +692,13 @@ public CompletableFuture<Void> closeAsync() {
final FlinkException cause = new FlinkException("Scheduler is being stopped.");

final CompletableFuture<Void> checkpointServicesShutdownFuture =
FutureUtils.composeAfterwards(
FutureUtils.composeAfterwardsAsync(
executionGraph
.getTerminationFuture()
.thenAcceptAsync(
this::shutDownCheckpointServices, getMainThreadExecutor()),
checkpointsCleaner::closeAsync);
checkpointsCleaner::closeAsync,
getMainThreadExecutor());

FutureUtils.assertNoException(checkpointServicesShutdownFuture);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,12 +724,15 @@ public CompletableFuture<Void> closeAsync() {

backgroundTask.abort();
// wait for the background task to finish and then close services
return FutureUtils.composeAfterwards(
return FutureUtils.composeAfterwardsAsync(
FutureUtils.runAfterwardsAsync(
backgroundTask.getTerminationFuture(),
() -> stopCheckpointServicesSafely(jobTerminationFuture.get()),
getMainThreadExecutor()),
checkpointsCleaner::closeAsync);
// closing the CheckpointsCleaner can complete in the ioExecutor when cleaning up a
// PendingCheckpoint
checkpointsCleaner::closeAsync,
getMainThreadExecutor());
}

private void stopCheckpointServicesSafely(JobStatus terminalState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public SchedulerNG createInstance(
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
new CheckpointsCleaner(),
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
Expand Down Expand Up @@ -224,6 +225,7 @@ public static AdaptiveBatchScheduler createScheduler(
ScheduledExecutorService futureExecutor,
ClassLoader userCodeLoader,
CheckpointRecoveryFactory checkpointRecoveryFactory,
CheckpointsCleaner checkpointsCleaner,
Duration rpcTimeout,
BlobWriter blobWriter,
JobManagerJobMetricGroup jobManagerJobMetricGroup,
Expand Down Expand Up @@ -293,7 +295,7 @@ public static AdaptiveBatchScheduler createScheduler(
componentMainThreadExecutor -> {},
delayExecutor,
userCodeLoader,
new CheckpointsCleaner(),
checkpointsCleaner,
checkpointRecoveryFactory,
jobManagerJobMetricGroup,
schedulingStrategyFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler(boolean enableSpecu
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
checkpointCleaner,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.function.BiFunctionWithException;

import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;

Expand Down Expand Up @@ -1793,6 +1795,19 @@ void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Exception {
}
}

@Test
void testCloseAsyncReturnsMainThreadFuture() throws Exception {
runCloseAsyncCompletesInMainThreadTest(
scheduledExecutorService,
(mainThreadExecutor, checkpointsCleaner) ->
createSchedulerBuilder(
singleJobVertexJobGraph(1),
mainThreadExecutor,
Collections.emptyList())
.setCheckpointCleaner(checkpointsCleaner)
.build());
}

@Test
void testJobStatusHookWithJobFailed() throws Exception {
commonJobStatusHookTest(ExecutionState.FAILED, JobStatus.FAILED);
Expand Down Expand Up @@ -1993,6 +2008,61 @@ public synchronized CompletableFuture<Void> closeAsync() {
schedulerClosed.get();
}

// visible to expose test logic to other Scheduler test classes
public static void runCloseAsyncCompletesInMainThreadTest(
ScheduledExecutorService singleThreadExecutorService,
BiFunctionWithException<
ComponentMainThreadExecutor, CheckpointsCleaner, SchedulerNG, Exception>
schedulerFactory)
throws Exception {
final OneShotLatch cleanerCloseLatch = new OneShotLatch();
final CompletableFuture<Void> cleanerCloseFuture = new CompletableFuture<>();
final CheckpointsCleaner checkpointsCleaner =
new CheckpointsCleaner() {
@Override
public CompletableFuture<Void> closeAsync() {
cleanerCloseLatch.trigger();
return cleanerCloseFuture;
}
};

final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
singleThreadExecutorService);
final SchedulerNG scheduler =
schedulerFactory.apply(mainThreadExecutor, checkpointsCleaner);

mainThreadExecutor.execute(scheduler::startScheduling);

final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
mainThreadExecutor.execute(
() -> {
// we shouldn't block the closeAsync call here because it's triggering
// additional tasks on the main thread internally
FutureUtils.forward(
scheduler
.closeAsync()
.thenRun(
() -> {
mainThreadExecutor.assertRunningInMainThread();
}),
closeFuture);
});

// wait for the CheckpointsCleaner#close call to not complete the future prematurely
cleanerCloseLatch.await();

// there is a race condition between returning the future and completing it which is due to
// the fact that we are triggering the latch before returning the future. That gives a small
// chance that the future completion is executed too early causing the future composition to
// end up in the main thread which is what we prevent in this test
Thread.sleep(50);
// completing this future in the test code simulates completing the
// CheckpointCleaner#closeAsync outside the main thread
cleanerCloseFuture.complete(null);
closeFuture.join();
}

private static long initiateFailure(
DefaultScheduler scheduler,
ExecutionAttemptID executionAttemptId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.singleNoOpJobGraph;
import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph;
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.createSlotOffersForResourceRequirements;
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
Expand Down Expand Up @@ -1114,6 +1115,19 @@ void testCloseShutsDownCheckpointingComponents() throws Exception {
assertThat(checkpointIdCounterShutdownFuture.get()).isEqualTo(JobStatus.FAILED);
}

@Test
void testCloseAsyncReturnsMainThreadFuture() throws Exception {
DefaultSchedulerTest.runCloseAsyncCompletesInMainThreadTest(
TEST_EXECUTOR_RESOURCE.getExecutor(),
(mainThreadExecutor, checkpointsCleaner) ->
new AdaptiveSchedulerBuilder(
singleNoOpJobGraph(),
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
.setCheckpointCleaner(checkpointsCleaner)
.build());
}

@Test
void testTransitionToStateCallsOnLeave() throws Exception {
scheduler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder.createCustomParallelismDecider;
import static org.apache.flink.runtime.scheduler.DefaultSchedulerTest.runCloseAsyncCompletesInMainThreadTest;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFailedTaskExecutionState;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFinishedTaskExecutionState;
import static org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDeciderTest.createDecider;
Expand Down Expand Up @@ -386,6 +388,22 @@ void testMergeDynamicParallelismFutures() {
assertThat(mergedSourceParallelismFuture.join()).isEqualTo(4);
}

@Test
void testCloseAsyncReturnsMainThreadFuture() throws Exception {
final ScheduledExecutorService scheduledExecutorServiceForMainThread =
Executors.newSingleThreadScheduledExecutor();
try {
runCloseAsyncCompletesInMainThreadTest(
scheduledExecutorServiceForMainThread,
(mainThread, checkpointsCleaner) ->
createSchedulerBuilder(createJobGraph(), mainThread)
.setCheckpointCleaner(checkpointsCleaner)
.buildAdaptiveBatchJobScheduler());
} finally {
scheduledExecutorServiceForMainThread.shutdownNow();
}
}

void testUserConfiguredMaxParallelism(
int globalMinParallelism,
int globalMaxParallelism,
Expand Down Expand Up @@ -576,6 +594,11 @@ private SchedulerBase createScheduler(
}

private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph) {
return createSchedulerBuilder(jobGraph, mainThreadExecutor);
}

private DefaultSchedulerBuilder createSchedulerBuilder(
JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) {
return new DefaultSchedulerBuilder(
jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
.setDelayExecutor(taskRestartExecutor);
Expand Down

0 comments on commit 5adcab9

Please sign in to comment.