From f9d7fb23bd52dde0c73c89792f454e198a9582a0 Mon Sep 17 00:00:00 2001 From: wangweicugw Date: Thu, 27 Jul 2023 10:02:57 +0800 Subject: [PATCH] Change the thread pool parameters to static type. --- .../jd/jdbc/monitor/ThreadPoolCollector.java | 14 ++- .../threadpool/InitThreadPoolService.java | 61 +++++++++++ .../VtRejectedExecutionHandler.java | 2 + .../java/com/jd/jdbc/vitess/VitessDriver.java | 35 +----- .../threadpool/InitConnectionPoolParams.java | 103 ++++++++++++++++++ 5 files changed, 181 insertions(+), 34 deletions(-) create mode 100644 src/main/java/com/jd/jdbc/util/threadpool/InitThreadPoolService.java create mode 100644 src/test/java/com/jd/jdbc/threadpool/InitConnectionPoolParams.java diff --git a/src/main/java/com/jd/jdbc/monitor/ThreadPoolCollector.java b/src/main/java/com/jd/jdbc/monitor/ThreadPoolCollector.java index f7af73b..35a278a 100644 --- a/src/main/java/com/jd/jdbc/monitor/ThreadPoolCollector.java +++ b/src/main/java/com/jd/jdbc/monitor/ThreadPoolCollector.java @@ -16,6 +16,9 @@ package com.jd.jdbc.monitor; + +import com.jd.jdbc.util.threadpool.VtRejectedExecutionHandler; + import io.prometheus.client.Collector; import io.prometheus.client.GaugeMetricFamily; import java.util.Arrays; @@ -49,8 +52,15 @@ public List collect() { createGauge("thread_pool_max_size", "thread.pool.max.size", ThreadPoolExecutor::getMaximumPoolSize), createGauge("thread_pool_active_size", "thread.pool.active.size", ThreadPoolExecutor::getActiveCount), createGauge("thread_pool_thread_count", "thread.pool.thread.count", ThreadPoolExecutor::getPoolSize), - createGauge("thread_pool_queue_size", "thread.pool.queue.size", e -> e.getQueue().size()) - ); + createGauge("thread_pool_queue_size", "thread.pool.queue.size", e -> e.getQueue().size()), + createGauge("thread_pool_queue_remainingCapacity", "thread.pool.queue.remainingCapacity", e -> e.getQueue().remainingCapacity()), + createGauge("thread_pool_vtrejected_handler_timeout", "thread.pool.vtrejected.handler.timeout", e -> { + if (!(e.getRejectedExecutionHandler() instanceof VtRejectedExecutionHandler)) { + return 0; + } + Long timeout = ((VtRejectedExecutionHandler) e.getRejectedExecutionHandler()).getTimeout(); + return Math.toIntExact(timeout); + })); } public void add(final String name, final ThreadPoolExecutor executor) { diff --git a/src/main/java/com/jd/jdbc/util/threadpool/InitThreadPoolService.java b/src/main/java/com/jd/jdbc/util/threadpool/InitThreadPoolService.java new file mode 100644 index 0000000..5e31b09 --- /dev/null +++ b/src/main/java/com/jd/jdbc/util/threadpool/InitThreadPoolService.java @@ -0,0 +1,61 @@ +/* +Copyright 2021 JD Project Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.jd.jdbc.util.threadpool; + +import com.jd.jdbc.sqlparser.utils.Utils; +import com.jd.jdbc.util.threadpool.impl.VtHealthCheckExecutorService; +import com.jd.jdbc.util.threadpool.impl.VtQueryExecutorService; +import java.util.Properties; + +public class InitThreadPoolService { + + private static InitThreadPoolService instance = new InitThreadPoolService(); + + private InitThreadPoolService() { } + + public static InitThreadPoolService getInstance() { + return instance; + } + + private static Integer queryCorePoolSize; + + private static Integer queryMaximumPoolSize; + + private static Integer queryQueueSize; + + private static Long queryRejectedExecutionTimeoutMillis; + + private static Integer healthCheckCorePoolSize; + + private static Integer healthCheckMaximumPoolSize; + + private static Integer healthCheckQueueSize; + + private static Long healthCheckRejectedExecutionTimeoutMillis; + + static { + Properties prop = System.getProperties(); + queryCorePoolSize = Utils.getInteger(prop, "queryCoreSize"); + queryMaximumPoolSize = Utils.getInteger(prop, "queryMaximumSize"); + queryQueueSize = Utils.getInteger(prop, "queryQueueSize"); + queryRejectedExecutionTimeoutMillis = Utils.getLong(prop, "queryRejectedTimeout"); + healthCheckCorePoolSize = Utils.getInteger(prop, "healthCheckCoreSize"); + healthCheckMaximumPoolSize = Utils.getInteger(prop, "healthCheckMaximumSize"); + healthCheckQueueSize = Utils.getInteger(prop, "healthCheckQueueSize"); + healthCheckRejectedExecutionTimeoutMillis = Utils.getLong(prop, "healthCheckRejectedTimeout"); + VtQueryExecutorService.initialize(queryCorePoolSize, queryMaximumPoolSize, queryQueueSize, queryRejectedExecutionTimeoutMillis); + VtHealthCheckExecutorService.initialize(healthCheckCorePoolSize, healthCheckMaximumPoolSize, healthCheckQueueSize, healthCheckRejectedExecutionTimeoutMillis); + } + +} diff --git a/src/main/java/com/jd/jdbc/util/threadpool/VtRejectedExecutionHandler.java b/src/main/java/com/jd/jdbc/util/threadpool/VtRejectedExecutionHandler.java index ca2373c..72bc52c 100644 --- a/src/main/java/com/jd/jdbc/util/threadpool/VtRejectedExecutionHandler.java +++ b/src/main/java/com/jd/jdbc/util/threadpool/VtRejectedExecutionHandler.java @@ -29,6 +29,7 @@ import java.util.concurrent.Semaphore; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import lombok.Getter; public class VtRejectedExecutionHandler implements RejectedExecutionHandler { private static final Log LOGGER = LogFactory.getLog(VtRejectedExecutionHandler.class); @@ -53,6 +54,7 @@ public class VtRejectedExecutionHandler implements RejectedExecutionHandler { private final String threadPoolName; + @Getter private final Long timeout; public VtRejectedExecutionHandler(final String threadPoolName, final Long timeout) { diff --git a/src/main/java/com/jd/jdbc/vitess/VitessDriver.java b/src/main/java/com/jd/jdbc/vitess/VitessDriver.java index eb0efaa..8d95cfd 100755 --- a/src/main/java/com/jd/jdbc/vitess/VitessDriver.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessDriver.java @@ -32,7 +32,6 @@ import com.jd.jdbc.monitor.MonitorServer; import com.jd.jdbc.sqlparser.support.logging.Log; import com.jd.jdbc.sqlparser.support.logging.LogFactory; -import com.jd.jdbc.sqlparser.utils.Utils; import com.jd.jdbc.srvtopo.ResilientServer; import com.jd.jdbc.srvtopo.ResolvedShard; import com.jd.jdbc.srvtopo.Resolver; @@ -43,8 +42,7 @@ import com.jd.jdbc.tindexes.SplitTableUtil; import com.jd.jdbc.topo.Topo; import com.jd.jdbc.topo.TopoServer; -import com.jd.jdbc.util.threadpool.impl.VtHealthCheckExecutorService; -import com.jd.jdbc.util.threadpool.impl.VtQueryExecutorService; +import com.jd.jdbc.util.threadpool.InitThreadPoolService; import com.jd.jdbc.vindexes.hash.BinaryHash; import io.prometheus.client.Histogram; import io.vitess.proto.Topodata; @@ -83,8 +81,6 @@ public class VitessDriver implements java.sql.Driver { private final ReentrantLock lock = new ReentrantLock(); - private volatile boolean inited = false; - public Connection initConnect(String url, Properties info, boolean initOnly) throws SQLException { if (log.isDebugEnabled()) { log.debug("initConnect,url=" + url); @@ -92,12 +88,7 @@ public Connection initConnect(String url, Properties info, boolean initOnly) thr Properties prop = VitessJdbcUrlParser.parse(url, info); - this.lock.lock(); - try { - initializePoolSize(prop); - } finally { - this.lock.unlock(); - } + InitThreadPoolService.getInstance(); try { SecurityCenter.INSTANCE.addCredential(prop); @@ -121,7 +112,6 @@ public Connection initConnect(String url, Properties info, boolean initOnly) thr for (String cell : cells) { TopologyWatcherManager.INSTANCE.watch(globalContext, cell, defaultKeyspace); } - boolean masterFlag = role.equalsIgnoreCase(Constant.DRIVER_PROPERTY_ROLE_RW); List tabletTypes = masterFlag ? Lists.newArrayList(Topodata.TabletType.MASTER) @@ -219,23 +209,4 @@ public Logger getParentLogger() throws SQLFeatureNotSupportedException { throw new SQLFeatureNotSupportedException(); } - private void initializePoolSize(Properties prop) { - if (this.inited) { - return; - } - - Integer queryCorePoolSize = Utils.getInteger(prop, "queryCoreSize"); - Integer queryMaximumPoolSize = Utils.getInteger(prop, "queryMaximumSize"); - Integer queryQueueSize = Utils.getInteger(prop, "queryQueueSize"); - Long queryRejectedExecutionTimeoutMillis = Utils.getLong(prop, "queryRejectedTimeout"); - VtQueryExecutorService.initialize(queryCorePoolSize, queryMaximumPoolSize, queryQueueSize, queryRejectedExecutionTimeoutMillis); - - Integer healthCheckCorePoolSize = Utils.getInteger(prop, "healthCheckCoreSize"); - Integer healthCheckMaximumPoolSize = Utils.getInteger(prop, "healthCheckMaximumSize"); - Integer healthCheckQueueSize = Utils.getInteger(prop, "healthCheckQueueSize"); - Long healthCheckRejectedExecutionTimeoutMillis = Utils.getLong(prop, "healthCheckRejectedTimeout"); - VtHealthCheckExecutorService.initialize(healthCheckCorePoolSize, healthCheckMaximumPoolSize, healthCheckQueueSize, healthCheckRejectedExecutionTimeoutMillis); - - this.inited = true; - } -} +} \ No newline at end of file diff --git a/src/test/java/com/jd/jdbc/threadpool/InitConnectionPoolParams.java b/src/test/java/com/jd/jdbc/threadpool/InitConnectionPoolParams.java new file mode 100644 index 0000000..1ce6788 --- /dev/null +++ b/src/test/java/com/jd/jdbc/threadpool/InitConnectionPoolParams.java @@ -0,0 +1,103 @@ +/* +Copyright 2021 JD Project Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.jd.jdbc.threadpool; + +import com.jd.jdbc.monitor.ThreadPoolCollector; +import com.jd.jdbc.util.threadpool.InitThreadPoolService; +import io.prometheus.client.Collector; +import java.util.List; +import static junit.framework.TestCase.fail; +import org.junit.Test; +import testsuite.TestSuite; + +public class InitConnectionPoolParams extends TestSuite { + + @Test + public void testInitThreadPoolParams() { + String queryCoreSize = "5"; + String queryMaximumSize = "101"; + String queryQueueSize = "1001"; + String queryRejectedTimeout = "3001"; + System.setProperty("queryCoreSize", queryCoreSize); + System.setProperty("queryMaximumSize", queryMaximumSize); + System.setProperty("queryQueueSize", queryQueueSize); + System.setProperty("queryRejectedTimeout", queryRejectedTimeout); + System.setProperty("healthCheckCoreSize", queryCoreSize); + System.setProperty("healthCheckMaximumSize", queryMaximumSize); + System.setProperty("healthCheckQueueSize", queryQueueSize); + System.setProperty("healthCheckRejectedTimeout", queryRejectedTimeout); + InitThreadPoolService.getInstance(); + int num = 0; + ThreadPoolCollector t = ThreadPoolCollector.getInstance(); + List collects = t.collect(); + for (Collector.MetricFamilySamples c : collects) { + if (c.name.equals("thread_pool_core_size")) { + List samples = c.samples; + for (Collector.MetricFamilySamples.Sample s : samples) { + if (s.labelValues.get(0).equals("QueryTask-") || s.labelValues.get(0).equals("HealthCheckTask-")) { + if (s.value == Double.parseDouble(queryCoreSize)) { + System.out.println(s.labelValues + ":thread_pool_core_size " + s.value); + num++; + } else { + fail(s.labelValues + ":thread_pool_core_size " + s.value); + } + } + } + } else if (c.name.equals("thread_pool_max_size")) { + List samples = c.samples; + for (Collector.MetricFamilySamples.Sample s : samples) { + if (s.labelValues.get(0).equals("QueryTask-") || s.labelValues.get(0).equals("HealthCheckTask-")) { + if (s.value == Double.parseDouble(queryMaximumSize)) { + System.out.println(s.labelValues + ":thread_pool_max_size " + s.value); + num++; + } else { + fail(s.labelValues + ":thread_pool_max_size " + s.value); + } + } + } + } else if (c.name.equals("thread_pool_queue_remainingCapacity")) { + List samples = c.samples; + for (Collector.MetricFamilySamples.Sample s : samples) { + if (s.labelValues.get(0).equals("QueryTask-") || s.labelValues.get(0).equals("HealthCheckTask-")) { + if (s.value == Double.parseDouble(queryQueueSize)) { + System.out.println(s.labelValues + ":thread_pool_queue_remainingCapacity " + s.value); + num++; + } else { + fail(s.labelValues + ":thread_pool_queue_remainingCapacity " + s.value); + } + } + } + } else if (c.name.equals("thread_pool_vtrejected_handler_timeout")) { + List samples = c.samples; + for (Collector.MetricFamilySamples.Sample s : samples) { + if (s.labelValues.get(0).equals("QueryTask-") || s.labelValues.get(0).equals("HealthCheckTask-")) { + if (s.value == Double.parseDouble(queryRejectedTimeout)) { + System.out.println(s.labelValues + ":thread_pool_vtrejected_handler_timeout " + s.value); + num++; + } else { + fail(s.labelValues + ":thread_pool_vtrejected_handler_timeout " + s.value); + } + } + } + } + } + if (num != 8) { + fail("testInitThreadPoolParams is [FAIL]"); + } + } + +}