Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-34227][runtime] Ensure that the scheduler close call doesn't leak IO threads into the JobMaster #26095

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -587,13 +587,41 @@ public static CompletableFuture<Void> runAfterwardsAsync(
*/
public static CompletableFuture<Void> composeAfterwards(
CompletableFuture<?> future, Supplier<CompletableFuture<?>> composedAction) {
return composeAfterwardsInternal(future, composedAction, CompletableFuture::whenComplete);
}

/**
* Run the given asynchronous action after the completion of the given future. The given future
* can be completed normally or exceptionally. In case of an exceptional completion, the
* asynchronous action's exception will be added to the initial exception.
*
* @param future to wait for its completion
* @param composedAction asynchronous action which is triggered after the future's completion
* @return Future which is completed on the passed {@link Executor} after the asynchronous
* action has completed. This future can contain an exception if an error occurred in the
* given future or asynchronous action.
*/
public static CompletableFuture<Void> composeAfterwardsAsync(
CompletableFuture<?> future,
Supplier<CompletableFuture<?>> composedAction,
Executor executor) {
return composeAfterwardsInternal(
future,
composedAction,
(composedActionFuture, resultFutureCompletion) ->
composedActionFuture.whenCompleteAsync(resultFutureCompletion, executor));
}

private static CompletableFuture<Void> composeAfterwardsInternal(
CompletableFuture<?> future,
Supplier<CompletableFuture<?>> composedAction,
BiConsumer<CompletableFuture<?>, BiConsumer<Object, Throwable>> forwardAction) {
final CompletableFuture<Void> resultFuture = new CompletableFuture<>();

future.whenComplete(
(Object outerIgnored, Throwable outerThrowable) -> {
final CompletableFuture<?> composedActionFuture = composedAction.get();

composedActionFuture.whenComplete(
forwardAction.accept(
composedAction.get(),
(Object innerIgnored, Throwable innerThrowable) -> {
if (innerThrowable != null) {
resultFuture.completeExceptionally(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,8 @@ private CompletableFuture<Void> stopJobExecution(final Exception cause) {
return FutureUtils.runAfterwards(
terminationFuture,
() -> {
getMainThreadExecutor().assertRunningInMainThread();

shuffleMaster.unregisterJob(executionPlan.getJobID());
disconnectTaskManagerResourceManagerConnections(cause);
stopJobMasterServices();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -692,12 +692,13 @@ public CompletableFuture<Void> closeAsync() {
final FlinkException cause = new FlinkException("Scheduler is being stopped.");

final CompletableFuture<Void> checkpointServicesShutdownFuture =
FutureUtils.composeAfterwards(
FutureUtils.composeAfterwardsAsync(
executionGraph
.getTerminationFuture()
.thenAcceptAsync(
this::shutDownCheckpointServices, getMainThreadExecutor()),
checkpointsCleaner::closeAsync);
checkpointsCleaner::closeAsync,
getMainThreadExecutor());

FutureUtils.assertNoException(checkpointServicesShutdownFuture);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,12 +724,15 @@ public CompletableFuture<Void> closeAsync() {

backgroundTask.abort();
// wait for the background task to finish and then close services
return FutureUtils.composeAfterwards(
return FutureUtils.composeAfterwardsAsync(
FutureUtils.runAfterwardsAsync(
backgroundTask.getTerminationFuture(),
() -> stopCheckpointServicesSafely(jobTerminationFuture.get()),
getMainThreadExecutor()),
checkpointsCleaner::closeAsync);
// closing the CheckpointsCleaner can complete in the ioExecutor when cleaning up a
// PendingCheckpoint
checkpointsCleaner::closeAsync,
getMainThreadExecutor());
}

private void stopCheckpointServicesSafely(JobStatus terminalState) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ public SchedulerNG createInstance(
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
new CheckpointsCleaner(),
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
Expand Down Expand Up @@ -224,6 +225,7 @@ public static AdaptiveBatchScheduler createScheduler(
ScheduledExecutorService futureExecutor,
ClassLoader userCodeLoader,
CheckpointRecoveryFactory checkpointRecoveryFactory,
CheckpointsCleaner checkpointsCleaner,
Duration rpcTimeout,
BlobWriter blobWriter,
JobManagerJobMetricGroup jobManagerJobMetricGroup,
Expand Down Expand Up @@ -293,7 +295,7 @@ public static AdaptiveBatchScheduler createScheduler(
componentMainThreadExecutor -> {},
delayExecutor,
userCodeLoader,
new CheckpointsCleaner(),
checkpointsCleaner,
checkpointRecoveryFactory,
jobManagerJobMetricGroup,
schedulingStrategyFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,7 @@ public AdaptiveBatchScheduler buildAdaptiveBatchJobScheduler(boolean enableSpecu
futureExecutor,
userCodeLoader,
checkpointRecoveryFactory,
checkpointCleaner,
rpcTimeout,
blobWriter,
jobManagerJobMetricGroup,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,10 @@
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import org.apache.flink.util.function.BiFunctionWithException;

import org.apache.flink.shaded.guava32.com.google.common.collect.Iterables;

Expand Down Expand Up @@ -1793,6 +1795,19 @@ void testCheckpointCleanerIsClosedAfterCheckpointServices() throws Exception {
}
}

@Test
void testCloseAsyncReturnsMainThreadFuture() throws Exception {
runCloseAsyncCompletesInMainThreadTest(
scheduledExecutorService,
(mainThreadExecutor, checkpointsCleaner) ->
createSchedulerBuilder(
singleJobVertexJobGraph(1),
mainThreadExecutor,
Collections.emptyList())
.setCheckpointCleaner(checkpointsCleaner)
.build());
}

@Test
void testJobStatusHookWithJobFailed() throws Exception {
commonJobStatusHookTest(ExecutionState.FAILED, JobStatus.FAILED);
Expand Down Expand Up @@ -1993,6 +2008,61 @@ public synchronized CompletableFuture<Void> closeAsync() {
schedulerClosed.get();
}

// visible to expose test logic to other Scheduler test classes
public static void runCloseAsyncCompletesInMainThreadTest(
ScheduledExecutorService singleThreadExecutorService,
BiFunctionWithException<
ComponentMainThreadExecutor, CheckpointsCleaner, SchedulerNG, Exception>
schedulerFactory)
throws Exception {
final OneShotLatch cleanerCloseLatch = new OneShotLatch();
final CompletableFuture<Void> cleanerCloseFuture = new CompletableFuture<>();
final CheckpointsCleaner checkpointsCleaner =
new CheckpointsCleaner() {
@Override
public CompletableFuture<Void> closeAsync() {
cleanerCloseLatch.trigger();
return cleanerCloseFuture;
}
};

final ComponentMainThreadExecutor mainThreadExecutor =
ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(
singleThreadExecutorService);
final SchedulerNG scheduler =
schedulerFactory.apply(mainThreadExecutor, checkpointsCleaner);

mainThreadExecutor.execute(scheduler::startScheduling);

final CompletableFuture<Void> closeFuture = new CompletableFuture<>();
mainThreadExecutor.execute(
() -> {
// we shouldn't block the closeAsync call here because it's triggering
// additional tasks on the main thread internally
FutureUtils.forward(
scheduler
.closeAsync()
.thenRun(
() -> {
mainThreadExecutor.assertRunningInMainThread();
}),
closeFuture);
});

// wait for the CheckpointsCleaner#close call to not complete the future prematurely
cleanerCloseLatch.await();

// there is a race condition between returning the future and completing it which is due to
// the fact that we are triggering the latch before returning the future. That gives a small
// chance that the future completion is executed too early causing the future composition to
// end up in the main thread which is what we prevent in this test
Thread.sleep(50);
// completing this future in the test code simulates completing the
// CheckpointCleaner#closeAsync outside the main thread
cleanerCloseFuture.complete(null);
closeFuture.join();
}

private static long initiateFailure(
DefaultScheduler scheduler,
ExecutionAttemptID executionAttemptId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
import static org.apache.flink.core.testutils.FlinkAssertions.assertThatFuture;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId;
import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createNoOpVertex;
import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.singleNoOpJobGraph;
import static org.apache.flink.runtime.jobgraph.JobGraphTestUtils.streamingJobGraph;
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.createSlotOffersForResourceRequirements;
import static org.apache.flink.runtime.jobmaster.slotpool.SlotPoolTestUtils.offerSlots;
Expand Down Expand Up @@ -1114,6 +1115,19 @@ void testCloseShutsDownCheckpointingComponents() throws Exception {
assertThat(checkpointIdCounterShutdownFuture.get()).isEqualTo(JobStatus.FAILED);
}

@Test
void testCloseAsyncReturnsMainThreadFuture() throws Exception {
DefaultSchedulerTest.runCloseAsyncCompletesInMainThreadTest(
TEST_EXECUTOR_RESOURCE.getExecutor(),
(mainThreadExecutor, checkpointsCleaner) ->
new AdaptiveSchedulerBuilder(
singleNoOpJobGraph(),
mainThreadExecutor,
EXECUTOR_RESOURCE.getExecutor())
.setCheckpointCleaner(checkpointsCleaner)
.build());
}

@Test
void testTransitionToStateCallsOnLeave() throws Exception {
scheduler =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,13 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import static org.apache.flink.runtime.scheduler.DefaultSchedulerBuilder.createCustomParallelismDecider;
import static org.apache.flink.runtime.scheduler.DefaultSchedulerTest.runCloseAsyncCompletesInMainThreadTest;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFailedTaskExecutionState;
import static org.apache.flink.runtime.scheduler.SchedulerTestingUtils.createFinishedTaskExecutionState;
import static org.apache.flink.runtime.scheduler.adaptivebatch.DefaultVertexParallelismAndInputInfosDeciderTest.createDecider;
Expand Down Expand Up @@ -386,6 +388,22 @@ void testMergeDynamicParallelismFutures() {
assertThat(mergedSourceParallelismFuture.join()).isEqualTo(4);
}

@Test
void testCloseAsyncReturnsMainThreadFuture() throws Exception {
final ScheduledExecutorService scheduledExecutorServiceForMainThread =
Executors.newSingleThreadScheduledExecutor();
try {
runCloseAsyncCompletesInMainThreadTest(
scheduledExecutorServiceForMainThread,
(mainThread, checkpointsCleaner) ->
createSchedulerBuilder(createJobGraph(), mainThread)
.setCheckpointCleaner(checkpointsCleaner)
.buildAdaptiveBatchJobScheduler());
} finally {
scheduledExecutorServiceForMainThread.shutdownNow();
}
}

void testUserConfiguredMaxParallelism(
int globalMinParallelism,
int globalMaxParallelism,
Expand Down Expand Up @@ -576,6 +594,11 @@ private SchedulerBase createScheduler(
}

private DefaultSchedulerBuilder createSchedulerBuilder(JobGraph jobGraph) {
return createSchedulerBuilder(jobGraph, mainThreadExecutor);
}

private DefaultSchedulerBuilder createSchedulerBuilder(
JobGraph jobGraph, ComponentMainThreadExecutor mainThreadExecutor) {
return new DefaultSchedulerBuilder(
jobGraph, mainThreadExecutor, EXECUTOR_RESOURCE.getExecutor())
.setDelayExecutor(taskRestartExecutor);
Expand Down