Skip to content

Commit

Permalink
[BugFix] fix thread pool size adjustment exception (backport #51191) (#…
Browse files Browse the repository at this point in the history
…51209)

Signed-off-by: Kevin Xiaohua Cai <[email protected]>
Co-authored-by: Kevin Cai <[email protected]>
  • Loading branch information
mergify[bot] and kevincai authored Sep 20, 2024
1 parent f3070ef commit fb91ade
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,21 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
}
}

public static void setFixedThreadPoolSize(ThreadPoolExecutor executor, int poolSize) {
int coreSize = executor.getCorePoolSize();
if (coreSize == poolSize) { // no change
return;
}
if (coreSize < poolSize) {
// increase the pool size, set the `MaximumPoolSize` first and then the `CoreSize`
executor.setMaximumPoolSize(poolSize);
executor.setCorePoolSize(poolSize);
} else {
// decrease the pool size, set `CoreSize` first and then the `MaximumPoolSize`
executor.setCorePoolSize(poolSize);
executor.setMaximumPoolSize(poolSize);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ private void createThreadPoolServer() throws TTransportException {
server = new SRTThreadPoolServer(serverArgs);

GlobalStateMgr.getCurrentState().getConfigRefreshDaemon().registerListener(() -> {
if (threadPoolExecutor.getMaximumPoolSize() != Config.thrift_server_max_worker_threads) {
threadPoolExecutor.setCorePoolSize(Config.thrift_server_max_worker_threads);
threadPoolExecutor.setMaximumPoolSize(Config.thrift_server_max_worker_threads);
}
ThreadPoolManager.setFixedThreadPoolSize(threadPoolExecutor, Config.thrift_server_max_worker_threads);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,7 @@ public int getCorePoolSize() {
}

public void setPoolSize(int poolSize) {
// corePoolSize and maximumPoolSize are same.
// When the previous poolSize is larger than the poolSize to be set,
// you need to setCorePoolSize first and then setMaximumPoolSize, and vice versa.
// Otherwise, it will throw IllegalArgumentException
int prePoolSize = executor.getCorePoolSize();
if (poolSize < prePoolSize) {
executor.setCorePoolSize(poolSize);
executor.setMaximumPoolSize(poolSize);
} else {
executor.setMaximumPoolSize(poolSize);
executor.setCorePoolSize(poolSize);
}
ThreadPoolManager.setFixedThreadPoolSize(executor, poolSize);
}

private class TaskChecker implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,7 @@ public int getCorePoolSize() {
}

public void setPoolSize(int poolSize) {
// When the previous poolSize is larger than the poolSize to be set,
// you need to setCorePoolSize first and then setMaximumPoolSize, and vice versa.
// Otherwise, it will throw IllegalArgumentException
int prePoolSize = executor.getCorePoolSize();
if (poolSize < prePoolSize) {
executor.setCorePoolSize(poolSize);
executor.setMaximumPoolSize(poolSize);
} else {
executor.setMaximumPoolSize(poolSize);
executor.setCorePoolSize(poolSize);
}
ThreadPoolManager.setFixedThreadPoolSize(executor, poolSize);
}

private class TaskChecker implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,7 @@ private void adjustLakeTaskExecutor() {
// DON'T LOG, otherwise the log line will repeat everytime the listener refreshes
return;
}

int oldNumThreads = lakeTaskExecutor.getMaximumPoolSize();
if (oldNumThreads == newNumThreads) {
return;
}

if (newNumThreads < oldNumThreads) { // scale in
lakeTaskExecutor.setCorePoolSize(newNumThreads);
lakeTaskExecutor.setMaximumPoolSize(newNumThreads);
} else { // scale out
lakeTaskExecutor.setMaximumPoolSize(newNumThreads);
lakeTaskExecutor.setCorePoolSize(newNumThreads);
}
ThreadPoolManager.setFixedThreadPoolSize(lakeTaskExecutor, newNumThreads);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,35 @@ public void testNormal() throws InterruptedException {
Assert.assertEquals(4, testFixedThreaddPool.getCompletedTaskCount());

}

@Test
public void testSetFixedThreadPoolSize() {
int expectedPoolSize = 2;
ThreadPoolExecutor testPool =
ThreadPoolManager.newDaemonFixedThreadPool(expectedPoolSize, 4096, "testPool", false);
Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize());
Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize());

{ // increase the pool size, no problem
expectedPoolSize = 10;
int poolSize = expectedPoolSize;
ExceptionChecker.expectThrowsNoException(
() -> ThreadPoolManager.setFixedThreadPoolSize(testPool, poolSize));
Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize());
Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize());
}

{ // decrease the pool size, no problem
expectedPoolSize = 5;
int poolSize = expectedPoolSize;
ExceptionChecker.expectThrowsNoException(
() -> ThreadPoolManager.setFixedThreadPoolSize(testPool, poolSize));
Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize());
Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize());
}

// can't set to <= 0
Assert.assertThrows(IllegalArgumentException.class, () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, 0));
Assert.assertThrows(IllegalArgumentException.class, () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, -1));
}
}

0 comments on commit fb91ade

Please sign in to comment.