From b9c2ccf1dd5e2ded0eeeb9b186880581a309be31 Mon Sep 17 00:00:00 2001 From: Kevin Cai Date: Fri, 20 Sep 2024 14:59:56 +0800 Subject: [PATCH] [BugFix] fix thread pool size adjustment exception (#51191) Signed-off-by: Kevin Xiaohua Cai (cherry picked from commit 5f3814317e1003a9a8aa885b22267b4b7cbb8496) --- .../starrocks/common/ThreadPoolManager.java | 16 ++++++++++ .../com/starrocks/common/ThriftServer.java | 5 +-- .../starrocks/task/LeaderTaskExecutor.java | 13 +------- .../task/PriorityLeaderTaskExecutor.java | 13 +------- .../transaction/PublishVersionDaemon.java | 14 +-------- .../common/ThreadPoolManagerTest.java | 31 +++++++++++++++++++ 6 files changed, 51 insertions(+), 41 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/com/starrocks/common/ThreadPoolManager.java index 9e31adefb233e..98cf94a89ed2d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/ThreadPoolManager.java @@ -235,5 +235,21 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { } } } + + public static void setFixedThreadPoolSize(ThreadPoolExecutor executor, int poolSize) { + int coreSize = executor.getCorePoolSize(); + if (coreSize == poolSize) { // no change + return; + } + if (coreSize < poolSize) { + // increase the pool size, set the `MaximumPoolSize` first and then the `CoreSize` + executor.setMaximumPoolSize(poolSize); + executor.setCorePoolSize(poolSize); + } else { + // decrease the pool size, set `CoreSize` first and then the `MaximumPoolSize` + executor.setCorePoolSize(poolSize); + executor.setMaximumPoolSize(poolSize); + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/common/ThriftServer.java b/fe/fe-core/src/main/java/com/starrocks/common/ThriftServer.java index afd06f4373ef7..00106be7281ac 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/ThriftServer.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/ThriftServer.java @@ -71,10 +71,7 @@ private void createThreadPoolServer() throws TTransportException { server = new SRTThreadPoolServer(serverArgs); GlobalStateMgr.getCurrentState().getConfigRefreshDaemon().registerListener(() -> { - if (threadPoolExecutor.getMaximumPoolSize() != Config.thrift_server_max_worker_threads) { - threadPoolExecutor.setCorePoolSize(Config.thrift_server_max_worker_threads); - threadPoolExecutor.setMaximumPoolSize(Config.thrift_server_max_worker_threads); - } + ThreadPoolManager.setFixedThreadPoolSize(threadPoolExecutor, Config.thrift_server_max_worker_threads); }); } diff --git a/fe/fe-core/src/main/java/com/starrocks/task/LeaderTaskExecutor.java b/fe/fe-core/src/main/java/com/starrocks/task/LeaderTaskExecutor.java index 74988f8220f96..34f31fede5a8c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/LeaderTaskExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/LeaderTaskExecutor.java @@ -119,18 +119,7 @@ public int getCorePoolSize() { } public void setPoolSize(int poolSize) { - // corePoolSize and maximumPoolSize are same. - // When the previous poolSize is larger than the poolSize to be set, - // you need to setCorePoolSize first and then setMaximumPoolSize, and vice versa. - // Otherwise, it will throw IllegalArgumentException - int prePoolSize = executor.getCorePoolSize(); - if (poolSize < prePoolSize) { - executor.setCorePoolSize(poolSize); - executor.setMaximumPoolSize(poolSize); - } else { - executor.setMaximumPoolSize(poolSize); - executor.setCorePoolSize(poolSize); - } + ThreadPoolManager.setFixedThreadPoolSize(executor, poolSize); } private class TaskChecker implements Runnable { diff --git a/fe/fe-core/src/main/java/com/starrocks/task/PriorityLeaderTaskExecutor.java b/fe/fe-core/src/main/java/com/starrocks/task/PriorityLeaderTaskExecutor.java index f143fd7544562..546a85b54e52a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/PriorityLeaderTaskExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/PriorityLeaderTaskExecutor.java @@ -111,18 +111,7 @@ public int getCorePoolSize() { } public void setPoolSize(int poolSize) { - // corePoolSize and maximumPoolSize are same. - // When the previous poolSize is larger than the poolSize to be set, - // you need to setCorePoolSize first and then setMaximumPoolSize, and vice versa. - // Otherwise, it will throw IllegalArgumentException - int prePoolSize = executor.getCorePoolSize(); - if (poolSize < prePoolSize) { - executor.setCorePoolSize(poolSize); - executor.setMaximumPoolSize(poolSize); - } else { - executor.setMaximumPoolSize(poolSize); - executor.setCorePoolSize(poolSize); - } + ThreadPoolManager.setFixedThreadPoolSize(executor, poolSize); } private class TaskChecker implements Runnable { diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java index 6090b80598420..06e311c20d13d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java @@ -174,19 +174,7 @@ private void adjustLakeTaskExecutor() { // DON'T LOG, otherwise the log line will repeat everytime the listener refreshes return; } - - int oldNumThreads = lakeTaskExecutor.getMaximumPoolSize(); - if (oldNumThreads == newNumThreads) { - return; - } - - if (newNumThreads < oldNumThreads) { // scale in - lakeTaskExecutor.setCorePoolSize(newNumThreads); - lakeTaskExecutor.setMaximumPoolSize(newNumThreads); - } else { // scale out - lakeTaskExecutor.setMaximumPoolSize(newNumThreads); - lakeTaskExecutor.setCorePoolSize(newNumThreads); - } + ThreadPoolManager.setFixedThreadPoolSize(lakeTaskExecutor, newNumThreads); } /** diff --git a/fe/fe-core/src/test/java/com/starrocks/common/ThreadPoolManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/common/ThreadPoolManagerTest.java index 6e2b99b98892d..f5b4306668509 100755 --- a/fe/fe-core/src/test/java/com/starrocks/common/ThreadPoolManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/common/ThreadPoolManagerTest.java @@ -87,4 +87,35 @@ public void testNormal() throws InterruptedException { Assert.assertEquals(4, testFixedThreaddPool.getCompletedTaskCount()); } + + @Test + public void testSetFixedThreadPoolSize() { + int expectedPoolSize = 2; + ThreadPoolExecutor testPool = + ThreadPoolManager.newDaemonFixedThreadPool(expectedPoolSize, 4096, "testPool", false); + Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize()); + Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize()); + + { // increase the pool size, no problem + expectedPoolSize = 10; + int poolSize = expectedPoolSize; + ExceptionChecker.expectThrowsNoException( + () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, poolSize)); + Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize()); + Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize()); + } + + { // decrease the pool size, no problem + expectedPoolSize = 5; + int poolSize = expectedPoolSize; + ExceptionChecker.expectThrowsNoException( + () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, poolSize)); + Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize()); + Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize()); + } + + // can't set to <= 0 + Assert.assertThrows(IllegalArgumentException.class, () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, 0)); + Assert.assertThrows(IllegalArgumentException.class, () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, -1)); + } }