Skip to content

Commit

Permalink
[BugFix] fix thread pool size adjustment exception (#51191)
Browse files Browse the repository at this point in the history
Signed-off-by: Kevin Xiaohua Cai <[email protected]>
(cherry picked from commit 5f38143)

# Conflicts:
#	fe/fe-core/src/main/java/com/starrocks/task/PriorityLeaderTaskExecutor.java
  • Loading branch information
kevincai authored and mergify[bot] committed Sep 20, 2024
1 parent 1c3fe2b commit b523da0
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,21 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
}
}

public static void setFixedThreadPoolSize(ThreadPoolExecutor executor, int poolSize) {
int coreSize = executor.getCorePoolSize();
if (coreSize == poolSize) { // no change
return;
}
if (coreSize < poolSize) {
// increase the pool size, set the `MaximumPoolSize` first and then the `CoreSize`
executor.setMaximumPoolSize(poolSize);
executor.setCorePoolSize(poolSize);
} else {
// decrease the pool size, set `CoreSize` first and then the `MaximumPoolSize`
executor.setCorePoolSize(poolSize);
executor.setMaximumPoolSize(poolSize);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ private void createThreadPoolServer() throws TTransportException {
server = new SRTThreadPoolServer(serverArgs);

GlobalStateMgr.getCurrentState().getConfigRefreshDaemon().registerListener(() -> {
if (threadPoolExecutor.getMaximumPoolSize() != Config.thrift_server_max_worker_threads) {
threadPoolExecutor.setCorePoolSize(Config.thrift_server_max_worker_threads);
threadPoolExecutor.setMaximumPoolSize(Config.thrift_server_max_worker_threads);
}
ThreadPoolManager.setFixedThreadPoolSize(threadPoolExecutor, Config.thrift_server_max_worker_threads);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,7 @@ public int getCorePoolSize() {
}

public void setPoolSize(int poolSize) {
// corePoolSize and maximumPoolSize are same.
// When the previous poolSize is larger than the poolSize to be set,
// you need to setCorePoolSize first and then setMaximumPoolSize, and vice versa.
// Otherwise, it will throw IllegalArgumentException
int prePoolSize = executor.getCorePoolSize();
if (poolSize < prePoolSize) {
executor.setCorePoolSize(poolSize);
executor.setMaximumPoolSize(poolSize);
} else {
executor.setMaximumPoolSize(poolSize);
executor.setCorePoolSize(poolSize);
}
ThreadPoolManager.setFixedThreadPoolSize(executor, poolSize);
}

private class TaskChecker implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ public int getCorePoolSize() {
}

public void setPoolSize(int poolSize) {
<<<<<<< HEAD
// When the previous poolSize is larger than the poolSize to be set,
// you need to setCorePoolSize first and then setMaximumPoolSize, and vice versa.
// Otherwise, it will throw IllegalArgumentException
Expand All @@ -122,6 +123,9 @@ public void setPoolSize(int poolSize) {
executor.setMaximumPoolSize(poolSize);
executor.setCorePoolSize(poolSize);
}
=======
ThreadPoolManager.setFixedThreadPoolSize(executor, poolSize);
>>>>>>> 5f3814317e ([BugFix] fix thread pool size adjustment exception (#51191))
}

private class TaskChecker implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,7 @@ private void adjustLakeTaskExecutor() {
// DON'T LOG, otherwise the log line will repeat everytime the listener refreshes
return;
}

int oldNumThreads = lakeTaskExecutor.getMaximumPoolSize();
if (oldNumThreads == newNumThreads) {
return;
}

if (newNumThreads < oldNumThreads) { // scale in
lakeTaskExecutor.setCorePoolSize(newNumThreads);
lakeTaskExecutor.setMaximumPoolSize(newNumThreads);
} else { // scale out
lakeTaskExecutor.setMaximumPoolSize(newNumThreads);
lakeTaskExecutor.setCorePoolSize(newNumThreads);
}
ThreadPoolManager.setFixedThreadPoolSize(lakeTaskExecutor, newNumThreads);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,35 @@ public void testNormal() throws InterruptedException {
Assert.assertEquals(4, testFixedThreaddPool.getCompletedTaskCount());

}

@Test
public void testSetFixedThreadPoolSize() {
int expectedPoolSize = 2;
ThreadPoolExecutor testPool =
ThreadPoolManager.newDaemonFixedThreadPool(expectedPoolSize, 4096, "testPool", false);
Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize());
Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize());

{ // increase the pool size, no problem
expectedPoolSize = 10;
int poolSize = expectedPoolSize;
ExceptionChecker.expectThrowsNoException(
() -> ThreadPoolManager.setFixedThreadPoolSize(testPool, poolSize));
Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize());
Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize());
}

{ // decrease the pool size, no problem
expectedPoolSize = 5;
int poolSize = expectedPoolSize;
ExceptionChecker.expectThrowsNoException(
() -> ThreadPoolManager.setFixedThreadPoolSize(testPool, poolSize));
Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize());
Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize());
}

// can't set to <= 0
Assert.assertThrows(IllegalArgumentException.class, () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, 0));
Assert.assertThrows(IllegalArgumentException.class, () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, -1));
}
}

0 comments on commit b523da0

Please sign in to comment.