Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] fix thread pool size adjustment exception (backport #51191) #51209

Merged
merged 2 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,5 +235,21 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
}
}
}

public static void setFixedThreadPoolSize(ThreadPoolExecutor executor, int poolSize) {
int coreSize = executor.getCorePoolSize();
if (coreSize == poolSize) { // no change
return;
}
if (coreSize < poolSize) {
// increase the pool size, set the `MaximumPoolSize` first and then the `CoreSize`
executor.setMaximumPoolSize(poolSize);
executor.setCorePoolSize(poolSize);
} else {
// decrease the pool size, set `CoreSize` first and then the `MaximumPoolSize`
executor.setCorePoolSize(poolSize);
executor.setMaximumPoolSize(poolSize);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,7 @@ private void createThreadPoolServer() throws TTransportException {
server = new SRTThreadPoolServer(serverArgs);

GlobalStateMgr.getCurrentState().getConfigRefreshDaemon().registerListener(() -> {
if (threadPoolExecutor.getMaximumPoolSize() != Config.thrift_server_max_worker_threads) {
threadPoolExecutor.setCorePoolSize(Config.thrift_server_max_worker_threads);
threadPoolExecutor.setMaximumPoolSize(Config.thrift_server_max_worker_threads);
}
ThreadPoolManager.setFixedThreadPoolSize(threadPoolExecutor, Config.thrift_server_max_worker_threads);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,7 @@ public int getCorePoolSize() {
}

public void setPoolSize(int poolSize) {
// corePoolSize and maximumPoolSize are same.
// When the previous poolSize is larger than the poolSize to be set,
// you need to setCorePoolSize first and then setMaximumPoolSize, and vice versa.
// Otherwise, it will throw IllegalArgumentException
int prePoolSize = executor.getCorePoolSize();
if (poolSize < prePoolSize) {
executor.setCorePoolSize(poolSize);
executor.setMaximumPoolSize(poolSize);
} else {
executor.setMaximumPoolSize(poolSize);
executor.setCorePoolSize(poolSize);
}
ThreadPoolManager.setFixedThreadPoolSize(executor, poolSize);
}

private class TaskChecker implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,7 @@ public int getCorePoolSize() {
}

public void setPoolSize(int poolSize) {
// When the previous poolSize is larger than the poolSize to be set,
// you need to setCorePoolSize first and then setMaximumPoolSize, and vice versa.
// Otherwise, it will throw IllegalArgumentException
int prePoolSize = executor.getCorePoolSize();
if (poolSize < prePoolSize) {
executor.setCorePoolSize(poolSize);
executor.setMaximumPoolSize(poolSize);
} else {
executor.setMaximumPoolSize(poolSize);
executor.setCorePoolSize(poolSize);
}
ThreadPoolManager.setFixedThreadPoolSize(executor, poolSize);
}

private class TaskChecker implements Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,19 +173,7 @@ private void adjustLakeTaskExecutor() {
// DON'T LOG, otherwise the log line will repeat everytime the listener refreshes
return;
}

int oldNumThreads = lakeTaskExecutor.getMaximumPoolSize();
if (oldNumThreads == newNumThreads) {
return;
}

if (newNumThreads < oldNumThreads) { // scale in
lakeTaskExecutor.setCorePoolSize(newNumThreads);
lakeTaskExecutor.setMaximumPoolSize(newNumThreads);
} else { // scale out
lakeTaskExecutor.setMaximumPoolSize(newNumThreads);
lakeTaskExecutor.setCorePoolSize(newNumThreads);
}
ThreadPoolManager.setFixedThreadPoolSize(lakeTaskExecutor, newNumThreads);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,35 @@ public void testNormal() throws InterruptedException {
Assert.assertEquals(4, testFixedThreaddPool.getCompletedTaskCount());

}

@Test
public void testSetFixedThreadPoolSize() {
int expectedPoolSize = 2;
ThreadPoolExecutor testPool =
ThreadPoolManager.newDaemonFixedThreadPool(expectedPoolSize, 4096, "testPool", false);
Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize());
Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize());

{ // increase the pool size, no problem
expectedPoolSize = 10;
int poolSize = expectedPoolSize;
ExceptionChecker.expectThrowsNoException(
() -> ThreadPoolManager.setFixedThreadPoolSize(testPool, poolSize));
Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize());
Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize());
}

{ // decrease the pool size, no problem
expectedPoolSize = 5;
int poolSize = expectedPoolSize;
ExceptionChecker.expectThrowsNoException(
() -> ThreadPoolManager.setFixedThreadPoolSize(testPool, poolSize));
Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize());
Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize());
}

// can't set to <= 0
Assert.assertThrows(IllegalArgumentException.class, () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, 0));
Assert.assertThrows(IllegalArgumentException.class, () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, -1));
}
}
Loading