Skip to content

Commit

Permalink
Change the thread pool parameters to static type.
Browse files Browse the repository at this point in the history
  • Loading branch information
wangweicugw committed Aug 11, 2023
1 parent 68f0db6 commit f9d7fb2
Show file tree
Hide file tree
Showing 5 changed files with 181 additions and 34 deletions.
14 changes: 12 additions & 2 deletions src/main/java/com/jd/jdbc/monitor/ThreadPoolCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.jd.jdbc.monitor;


import com.jd.jdbc.util.threadpool.VtRejectedExecutionHandler;

import io.prometheus.client.Collector;
import io.prometheus.client.GaugeMetricFamily;
import java.util.Arrays;
Expand Down Expand Up @@ -49,8 +52,15 @@ public List<MetricFamilySamples> collect() {
createGauge("thread_pool_max_size", "thread.pool.max.size", ThreadPoolExecutor::getMaximumPoolSize),
createGauge("thread_pool_active_size", "thread.pool.active.size", ThreadPoolExecutor::getActiveCount),
createGauge("thread_pool_thread_count", "thread.pool.thread.count", ThreadPoolExecutor::getPoolSize),
createGauge("thread_pool_queue_size", "thread.pool.queue.size", e -> e.getQueue().size())
);
createGauge("thread_pool_queue_size", "thread.pool.queue.size", e -> e.getQueue().size()),
createGauge("thread_pool_queue_remainingCapacity", "thread.pool.queue.remainingCapacity", e -> e.getQueue().remainingCapacity()),
createGauge("thread_pool_vtrejected_handler_timeout", "thread.pool.vtrejected.handler.timeout", e -> {
if (!(e.getRejectedExecutionHandler() instanceof VtRejectedExecutionHandler)) {
return 0;
}
Long timeout = ((VtRejectedExecutionHandler) e.getRejectedExecutionHandler()).getTimeout();
return Math.toIntExact(timeout);
}));
}

public void add(final String name, final ThreadPoolExecutor executor) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
Copyright 2021 JD Project Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.jd.jdbc.util.threadpool;

import com.jd.jdbc.sqlparser.utils.Utils;
import com.jd.jdbc.util.threadpool.impl.VtHealthCheckExecutorService;
import com.jd.jdbc.util.threadpool.impl.VtQueryExecutorService;
import java.util.Properties;

public class InitThreadPoolService {

private static InitThreadPoolService instance = new InitThreadPoolService();

private InitThreadPoolService() { }

public static InitThreadPoolService getInstance() {
return instance;
}

private static Integer queryCorePoolSize;

private static Integer queryMaximumPoolSize;

private static Integer queryQueueSize;

private static Long queryRejectedExecutionTimeoutMillis;

private static Integer healthCheckCorePoolSize;

private static Integer healthCheckMaximumPoolSize;

private static Integer healthCheckQueueSize;

private static Long healthCheckRejectedExecutionTimeoutMillis;

static {
Properties prop = System.getProperties();
queryCorePoolSize = Utils.getInteger(prop, "queryCoreSize");
queryMaximumPoolSize = Utils.getInteger(prop, "queryMaximumSize");
queryQueueSize = Utils.getInteger(prop, "queryQueueSize");
queryRejectedExecutionTimeoutMillis = Utils.getLong(prop, "queryRejectedTimeout");
healthCheckCorePoolSize = Utils.getInteger(prop, "healthCheckCoreSize");
healthCheckMaximumPoolSize = Utils.getInteger(prop, "healthCheckMaximumSize");
healthCheckQueueSize = Utils.getInteger(prop, "healthCheckQueueSize");
healthCheckRejectedExecutionTimeoutMillis = Utils.getLong(prop, "healthCheckRejectedTimeout");
VtQueryExecutorService.initialize(queryCorePoolSize, queryMaximumPoolSize, queryQueueSize, queryRejectedExecutionTimeoutMillis);
VtHealthCheckExecutorService.initialize(healthCheckCorePoolSize, healthCheckMaximumPoolSize, healthCheckQueueSize, healthCheckRejectedExecutionTimeoutMillis);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import lombok.Getter;

public class VtRejectedExecutionHandler implements RejectedExecutionHandler {
private static final Log LOGGER = LogFactory.getLog(VtRejectedExecutionHandler.class);
Expand All @@ -53,6 +54,7 @@ public class VtRejectedExecutionHandler implements RejectedExecutionHandler {

private final String threadPoolName;

@Getter
private final Long timeout;

public VtRejectedExecutionHandler(final String threadPoolName, final Long timeout) {
Expand Down
35 changes: 3 additions & 32 deletions src/main/java/com/jd/jdbc/vitess/VitessDriver.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import com.jd.jdbc.monitor.MonitorServer;
import com.jd.jdbc.sqlparser.support.logging.Log;
import com.jd.jdbc.sqlparser.support.logging.LogFactory;
import com.jd.jdbc.sqlparser.utils.Utils;
import com.jd.jdbc.srvtopo.ResilientServer;
import com.jd.jdbc.srvtopo.ResolvedShard;
import com.jd.jdbc.srvtopo.Resolver;
Expand All @@ -43,8 +42,7 @@
import com.jd.jdbc.tindexes.SplitTableUtil;
import com.jd.jdbc.topo.Topo;
import com.jd.jdbc.topo.TopoServer;
import com.jd.jdbc.util.threadpool.impl.VtHealthCheckExecutorService;
import com.jd.jdbc.util.threadpool.impl.VtQueryExecutorService;
import com.jd.jdbc.util.threadpool.InitThreadPoolService;
import com.jd.jdbc.vindexes.hash.BinaryHash;
import io.prometheus.client.Histogram;
import io.vitess.proto.Topodata;
Expand Down Expand Up @@ -83,21 +81,14 @@ public class VitessDriver implements java.sql.Driver {

private final ReentrantLock lock = new ReentrantLock();

private volatile boolean inited = false;

public Connection initConnect(String url, Properties info, boolean initOnly) throws SQLException {
if (log.isDebugEnabled()) {
log.debug("initConnect,url=" + url);
}

Properties prop = VitessJdbcUrlParser.parse(url, info);

this.lock.lock();
try {
initializePoolSize(prop);
} finally {
this.lock.unlock();
}
InitThreadPoolService.getInstance();

try {
SecurityCenter.INSTANCE.addCredential(prop);
Expand All @@ -121,7 +112,6 @@ public Connection initConnect(String url, Properties info, boolean initOnly) thr
for (String cell : cells) {
TopologyWatcherManager.INSTANCE.watch(globalContext, cell, defaultKeyspace);
}

boolean masterFlag = role.equalsIgnoreCase(Constant.DRIVER_PROPERTY_ROLE_RW);
List<Topodata.TabletType> tabletTypes = masterFlag
? Lists.newArrayList(Topodata.TabletType.MASTER)
Expand Down Expand Up @@ -219,23 +209,4 @@ public Logger getParentLogger() throws SQLFeatureNotSupportedException {
throw new SQLFeatureNotSupportedException();
}

private void initializePoolSize(Properties prop) {
if (this.inited) {
return;
}

Integer queryCorePoolSize = Utils.getInteger(prop, "queryCoreSize");
Integer queryMaximumPoolSize = Utils.getInteger(prop, "queryMaximumSize");
Integer queryQueueSize = Utils.getInteger(prop, "queryQueueSize");
Long queryRejectedExecutionTimeoutMillis = Utils.getLong(prop, "queryRejectedTimeout");
VtQueryExecutorService.initialize(queryCorePoolSize, queryMaximumPoolSize, queryQueueSize, queryRejectedExecutionTimeoutMillis);

Integer healthCheckCorePoolSize = Utils.getInteger(prop, "healthCheckCoreSize");
Integer healthCheckMaximumPoolSize = Utils.getInteger(prop, "healthCheckMaximumSize");
Integer healthCheckQueueSize = Utils.getInteger(prop, "healthCheckQueueSize");
Long healthCheckRejectedExecutionTimeoutMillis = Utils.getLong(prop, "healthCheckRejectedTimeout");
VtHealthCheckExecutorService.initialize(healthCheckCorePoolSize, healthCheckMaximumPoolSize, healthCheckQueueSize, healthCheckRejectedExecutionTimeoutMillis);

this.inited = true;
}
}
}
103 changes: 103 additions & 0 deletions src/test/java/com/jd/jdbc/threadpool/InitConnectionPoolParams.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
Copyright 2021 JD Project Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.jd.jdbc.threadpool;

import com.jd.jdbc.monitor.ThreadPoolCollector;
import com.jd.jdbc.util.threadpool.InitThreadPoolService;
import io.prometheus.client.Collector;
import java.util.List;
import static junit.framework.TestCase.fail;
import org.junit.Test;
import testsuite.TestSuite;

public class InitConnectionPoolParams extends TestSuite {

@Test
public void testInitThreadPoolParams() {
String queryCoreSize = "5";
String queryMaximumSize = "101";
String queryQueueSize = "1001";
String queryRejectedTimeout = "3001";
System.setProperty("queryCoreSize", queryCoreSize);
System.setProperty("queryMaximumSize", queryMaximumSize);
System.setProperty("queryQueueSize", queryQueueSize);
System.setProperty("queryRejectedTimeout", queryRejectedTimeout);
System.setProperty("healthCheckCoreSize", queryCoreSize);
System.setProperty("healthCheckMaximumSize", queryMaximumSize);
System.setProperty("healthCheckQueueSize", queryQueueSize);
System.setProperty("healthCheckRejectedTimeout", queryRejectedTimeout);
InitThreadPoolService.getInstance();
int num = 0;
ThreadPoolCollector t = ThreadPoolCollector.getInstance();
List<Collector.MetricFamilySamples> collects = t.collect();
for (Collector.MetricFamilySamples c : collects) {
if (c.name.equals("thread_pool_core_size")) {
List<Collector.MetricFamilySamples.Sample> samples = c.samples;
for (Collector.MetricFamilySamples.Sample s : samples) {
if (s.labelValues.get(0).equals("QueryTask-") || s.labelValues.get(0).equals("HealthCheckTask-")) {
if (s.value == Double.parseDouble(queryCoreSize)) {
System.out.println(s.labelValues + ":thread_pool_core_size " + s.value);
num++;
} else {
fail(s.labelValues + ":thread_pool_core_size " + s.value);
}
}
}
} else if (c.name.equals("thread_pool_max_size")) {
List<Collector.MetricFamilySamples.Sample> samples = c.samples;
for (Collector.MetricFamilySamples.Sample s : samples) {
if (s.labelValues.get(0).equals("QueryTask-") || s.labelValues.get(0).equals("HealthCheckTask-")) {
if (s.value == Double.parseDouble(queryMaximumSize)) {
System.out.println(s.labelValues + ":thread_pool_max_size " + s.value);
num++;
} else {
fail(s.labelValues + ":thread_pool_max_size " + s.value);
}
}
}
} else if (c.name.equals("thread_pool_queue_remainingCapacity")) {
List<Collector.MetricFamilySamples.Sample> samples = c.samples;
for (Collector.MetricFamilySamples.Sample s : samples) {
if (s.labelValues.get(0).equals("QueryTask-") || s.labelValues.get(0).equals("HealthCheckTask-")) {
if (s.value == Double.parseDouble(queryQueueSize)) {
System.out.println(s.labelValues + ":thread_pool_queue_remainingCapacity " + s.value);
num++;
} else {
fail(s.labelValues + ":thread_pool_queue_remainingCapacity " + s.value);
}
}
}
} else if (c.name.equals("thread_pool_vtrejected_handler_timeout")) {
List<Collector.MetricFamilySamples.Sample> samples = c.samples;
for (Collector.MetricFamilySamples.Sample s : samples) {
if (s.labelValues.get(0).equals("QueryTask-") || s.labelValues.get(0).equals("HealthCheckTask-")) {
if (s.value == Double.parseDouble(queryRejectedTimeout)) {
System.out.println(s.labelValues + ":thread_pool_vtrejected_handler_timeout " + s.value);
num++;
} else {
fail(s.labelValues + ":thread_pool_vtrejected_handler_timeout " + s.value);
}
}
}
}
}
if (num != 8) {
fail("testInitThreadPoolParams is [FAIL]");
}
}

}

0 comments on commit f9d7fb2

Please sign in to comment.