Skip to content

Commit

Permalink
[FLINK-36921][runtime] Fix unstable AdaptiveExecutionPlanSchedulingCo…
Browse files Browse the repository at this point in the history
…ntextTest
  • Loading branch information
JunRuiLee committed Dec 25, 2024
1 parent f8014b6 commit 1e2787a
Showing 1 changed file with 7 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.junit.jupiter.api.extension.RegisterExtension;

import java.util.Collections;
import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;

import static org.apache.flink.runtime.scheduler.SchedulerBase.getDefaultMaxParallelism;
Expand Down Expand Up @@ -142,15 +141,14 @@ private static DefaultAdaptiveExecutionHandler getDefaultAdaptiveExecutionHandle
env.fromSequence(0L, 1L).disableChaining().print();
StreamGraph streamGraph = env.getStreamGraph();

Iterator<StreamNode> iterator = streamGraph.getStreamNodes().iterator();
for (StreamNode streamNode : streamGraph.getStreamNodes()) {
if (streamNode.getOperatorName().contains("Sink")) {
streamNode.setParallelism(sinkParallelism);

iterator.next();
StreamNode sink = iterator.next();

sink.setParallelism(sinkParallelism);

if (sinkMaxParallelism > 0) {
sink.setMaxParallelism(sinkMaxParallelism);
if (sinkMaxParallelism > 0) {
streamNode.setMaxParallelism(sinkMaxParallelism);
}
}
}

return new DefaultAdaptiveExecutionHandler(
Expand Down

0 comments on commit 1e2787a

Please sign in to comment.