From b523da0f6e836ff982aecfc52e53deb9996e5bc7 Mon Sep 17 00:00:00 2001 From: Kevin Cai Date: Fri, 20 Sep 2024 14:59:56 +0800 Subject: [PATCH] [BugFix] fix thread pool size adjustment exception (#51191) Signed-off-by: Kevin Xiaohua Cai (cherry picked from commit 5f3814317e1003a9a8aa885b22267b4b7cbb8496) # Conflicts: # fe/fe-core/src/main/java/com/starrocks/task/PriorityLeaderTaskExecutor.java --- .../starrocks/common/ThreadPoolManager.java | 16 ++++++++++ .../com/starrocks/common/ThriftServer.java | 5 +-- .../starrocks/task/LeaderTaskExecutor.java | 13 +------- .../task/PriorityLeaderTaskExecutor.java | 4 +++ .../transaction/PublishVersionDaemon.java | 14 +-------- .../common/ThreadPoolManagerTest.java | 31 +++++++++++++++++++ 6 files changed, 54 insertions(+), 29 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/common/ThreadPoolManager.java b/fe/fe-core/src/main/java/com/starrocks/common/ThreadPoolManager.java index 9e31adefb233e..98cf94a89ed2d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/ThreadPoolManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/ThreadPoolManager.java @@ -235,5 +235,21 @@ public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { } } } + + public static void setFixedThreadPoolSize(ThreadPoolExecutor executor, int poolSize) { + int coreSize = executor.getCorePoolSize(); + if (coreSize == poolSize) { // no change + return; + } + if (coreSize < poolSize) { + // increase the pool size, set the `MaximumPoolSize` first and then the `CoreSize` + executor.setMaximumPoolSize(poolSize); + executor.setCorePoolSize(poolSize); + } else { + // decrease the pool size, set `CoreSize` first and then the `MaximumPoolSize` + executor.setCorePoolSize(poolSize); + executor.setMaximumPoolSize(poolSize); + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/common/ThriftServer.java b/fe/fe-core/src/main/java/com/starrocks/common/ThriftServer.java index ad9a14cbdffa7..83d250d168dc1 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/ThriftServer.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/ThriftServer.java @@ -71,10 +71,7 @@ private void createThreadPoolServer() throws TTransportException { server = new SRTThreadPoolServer(serverArgs); GlobalStateMgr.getCurrentState().getConfigRefreshDaemon().registerListener(() -> { - if (threadPoolExecutor.getMaximumPoolSize() != Config.thrift_server_max_worker_threads) { - threadPoolExecutor.setCorePoolSize(Config.thrift_server_max_worker_threads); - threadPoolExecutor.setMaximumPoolSize(Config.thrift_server_max_worker_threads); - } + ThreadPoolManager.setFixedThreadPoolSize(threadPoolExecutor, Config.thrift_server_max_worker_threads); }); } diff --git a/fe/fe-core/src/main/java/com/starrocks/task/LeaderTaskExecutor.java b/fe/fe-core/src/main/java/com/starrocks/task/LeaderTaskExecutor.java index 74988f8220f96..34f31fede5a8c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/LeaderTaskExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/LeaderTaskExecutor.java @@ -119,18 +119,7 @@ public int getCorePoolSize() { } public void setPoolSize(int poolSize) { - // corePoolSize and maximumPoolSize are same. - // When the previous poolSize is larger than the poolSize to be set, - // you need to setCorePoolSize first and then setMaximumPoolSize, and vice versa. - // Otherwise, it will throw IllegalArgumentException - int prePoolSize = executor.getCorePoolSize(); - if (poolSize < prePoolSize) { - executor.setCorePoolSize(poolSize); - executor.setMaximumPoolSize(poolSize); - } else { - executor.setMaximumPoolSize(poolSize); - executor.setCorePoolSize(poolSize); - } + ThreadPoolManager.setFixedThreadPoolSize(executor, poolSize); } private class TaskChecker implements Runnable { diff --git a/fe/fe-core/src/main/java/com/starrocks/task/PriorityLeaderTaskExecutor.java b/fe/fe-core/src/main/java/com/starrocks/task/PriorityLeaderTaskExecutor.java index e3d736333b670..053e3c8134f70 100644 --- a/fe/fe-core/src/main/java/com/starrocks/task/PriorityLeaderTaskExecutor.java +++ b/fe/fe-core/src/main/java/com/starrocks/task/PriorityLeaderTaskExecutor.java @@ -111,6 +111,7 @@ public int getCorePoolSize() { } public void setPoolSize(int poolSize) { +<<<<<<< HEAD // When the previous poolSize is larger than the poolSize to be set, // you need to setCorePoolSize first and then setMaximumPoolSize, and vice versa. // Otherwise, it will throw IllegalArgumentException @@ -122,6 +123,9 @@ public void setPoolSize(int poolSize) { executor.setMaximumPoolSize(poolSize); executor.setCorePoolSize(poolSize); } +======= + ThreadPoolManager.setFixedThreadPoolSize(executor, poolSize); +>>>>>>> 5f3814317e ([BugFix] fix thread pool size adjustment exception (#51191)) } private class TaskChecker implements Runnable { diff --git a/fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java b/fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java index 6adfc05ec594b..79ac97abdb307 100644 --- a/fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java +++ b/fe/fe-core/src/main/java/com/starrocks/transaction/PublishVersionDaemon.java @@ -173,19 +173,7 @@ private void adjustLakeTaskExecutor() { // DON'T LOG, otherwise the log line will repeat everytime the listener refreshes return; } - - int oldNumThreads = lakeTaskExecutor.getMaximumPoolSize(); - if (oldNumThreads == newNumThreads) { - return; - } - - if (newNumThreads < oldNumThreads) { // scale in - lakeTaskExecutor.setCorePoolSize(newNumThreads); - lakeTaskExecutor.setMaximumPoolSize(newNumThreads); - } else { // scale out - lakeTaskExecutor.setMaximumPoolSize(newNumThreads); - lakeTaskExecutor.setCorePoolSize(newNumThreads); - } + ThreadPoolManager.setFixedThreadPoolSize(lakeTaskExecutor, newNumThreads); } /** diff --git a/fe/fe-core/src/test/java/com/starrocks/common/ThreadPoolManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/common/ThreadPoolManagerTest.java index 6e2b99b98892d..f5b4306668509 100755 --- a/fe/fe-core/src/test/java/com/starrocks/common/ThreadPoolManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/common/ThreadPoolManagerTest.java @@ -87,4 +87,35 @@ public void testNormal() throws InterruptedException { Assert.assertEquals(4, testFixedThreaddPool.getCompletedTaskCount()); } + + @Test + public void testSetFixedThreadPoolSize() { + int expectedPoolSize = 2; + ThreadPoolExecutor testPool = + ThreadPoolManager.newDaemonFixedThreadPool(expectedPoolSize, 4096, "testPool", false); + Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize()); + Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize()); + + { // increase the pool size, no problem + expectedPoolSize = 10; + int poolSize = expectedPoolSize; + ExceptionChecker.expectThrowsNoException( + () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, poolSize)); + Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize()); + Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize()); + } + + { // decrease the pool size, no problem + expectedPoolSize = 5; + int poolSize = expectedPoolSize; + ExceptionChecker.expectThrowsNoException( + () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, poolSize)); + Assert.assertEquals(expectedPoolSize, testPool.getCorePoolSize()); + Assert.assertEquals(expectedPoolSize, testPool.getMaximumPoolSize()); + } + + // can't set to <= 0 + Assert.assertThrows(IllegalArgumentException.class, () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, 0)); + Assert.assertThrows(IllegalArgumentException.class, () -> ThreadPoolManager.setFixedThreadPoolSize(testPool, -1)); + } }