From a205ef77421508863ea4399b3537b482b2484cae Mon Sep 17 00:00:00 2001 From: wangweicugw <38103831+wangweicugw@users.noreply.github.com> Date: Sat, 9 Sep 2023 18:33:22 +0800 Subject: [PATCH] The establishment time and execution time for accessing the topology can be configured. (#136) --- docs/properties.md | 20 ++++-- .../jdbc/topo/etcd2topo/Etcd2TopoFactory.java | 12 +++- .../jdbc/topo/etcd2topo/Etcd2TopoServer.java | 10 +-- .../com/jd/jdbc/discovery/MockTablet.java | 18 +++++ .../topo/etcd2topo/Etcd2TopoFactoryTest.java | 50 +++++++++++++ .../topo/etcd2topo/Etcd2TopoServerTest.java | 71 +++++++++++++++++++ 6 files changed, 168 insertions(+), 13 deletions(-) create mode 100644 src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoFactoryTest.java create mode 100644 src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServerTest.java diff --git a/docs/properties.md b/docs/properties.md index 1ee479b..ea2b0c7 100644 --- a/docs/properties.md +++ b/docs/properties.md @@ -71,8 +71,18 @@ ### VtDriver中的系统参数 通过JVM参数方式传入,比如-Dvtdriver.api.port=9999 -| 属性 | 数据类型 | 默认值 | 备注 | -|---|---|---|---| -| vtdriver.api.port | int | 15002 | 指定开启的http端口 | -| vtdriver.monitor.port | int | 15001 | 指定开启的http端口(prometheus) | -| vtdriver.secondsBehindMaster | int | 7200 | 指定HealthCheck中判定tablet为可用的最大主从延迟,默认值为7200s | \ No newline at end of file +| 属性 | 数据类型 | 默认值 | 备注 | +|---|---|-------------------------------------------------------------------------------------|--------------------------------------------| +| vtdriver.api.port | int | 15002 | 指定开启的http端口 | +| vtdriver.monitor.port | int | 15001 | 指定开启的http端口(prometheus) | +| vtdriver.secondsBehindMaster | int | 7200 | 指定HealthCheck中判定tablet为可用的最大主从延迟,默认值为7200s | +| vtdriver.queryCoreSize | int | jdk1.8.0_131以前的版本默认值为8,之后的版本的默认值根据应用容器的cpu核数来设定,核数小于8取8,核数大于32取32,核数在8-32之间取应用cpu核数 | 执行SQL线程池核心线程数 | +| vtdriver.queryMaximumSize | int | 100 | 执行SQL线程池最大线程数 | +| vtdriver.queryQueueSize | int | 1000 | 执行SQL线程池任务队列长度 | +| vtdriver.queryRejectedTimeout | long | 3000 | 执行SQL线程池拒绝任务丢弃超时(毫秒) | +| vtdriver.healthCheckCoreSize | int | 10 | healthCheck线程池核心线程数 | +| vtdriver.healthCheckMaximumSize | int | 100 | healthCheck线程池最大线程数 | +| vtdriver.healthCheckQueueSize | int | 10000 | healthCheck线程池任务队列长度 | +| vtdriver.healthCheckRejectedTimeout | long | 3000 | healthCheck线程池拒绝任务丢弃超时(毫秒) | +| vtdriver.topoExecuteTimeout | long | 10000 | 访问拓扑元数据的执行超时时间(毫秒) | +| vtdriver.topoConnectTimeout | long | 5000 | 访问拓扑元数据连接超时时间(毫秒) | \ No newline at end of file diff --git a/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoFactory.java b/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoFactory.java index a7e5ec7..0bfee53 100644 --- a/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoFactory.java +++ b/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoFactory.java @@ -31,6 +31,12 @@ public class Etcd2TopoFactory implements TopoFactory { + private static final Duration KEEPALIVE_DURATION = Duration.ofSeconds(10L); + + private static final long CONNECT_TIMEOUT_LONG = Long.getLong("vtdriver.topoConnectTimeout", 5000L); + + private static final Duration CONNECT_DURATION = Duration.ofMillis(CONNECT_TIMEOUT_LONG); + private String clientCertPath = ""; private String clientKeyPath = ""; @@ -90,10 +96,10 @@ private Etcd2TopoServer newServerWithOpts(String serverAddr, String root, String } List endpoints = Util.toURIs(Arrays.asList(result)); Client client = Client.builder().endpoints(endpoints) - .keepaliveTimeout(Duration.ofSeconds(10L)) - .keepaliveTime(Duration.ofSeconds(10L)) + .keepaliveTimeout(KEEPALIVE_DURATION) + .keepaliveTime(KEEPALIVE_DURATION) .keepaliveWithoutCalls(true) - .connectTimeout(Duration.ofSeconds(5)) + .connectTimeout(CONNECT_DURATION) .waitForReady(false) .executorService(TabletNettyExecutorService.getNettyExecutorService()).build(); diff --git a/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java b/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java index 20b9377..10c5f53 100644 --- a/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java +++ b/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java @@ -53,7 +53,7 @@ public class Etcd2TopoServer implements TopoConnection { private static final String END_TAG_OF_RANGE_SEARCH = "1"; - private static final long DEFALUT_TIMEOUT = 10L; + private static final long DEFALUT_TIMEOUT = Long.getLong("vtdriver.topoExecuteTimeout", 10000L); private static final ConcurrentMap WATCHER_MAP = new ConcurrentHashMap<>(16); @@ -86,7 +86,7 @@ public List listDir(IContext ctx, String dirPath, boolean isFull, bool CompletableFuture future = this.client.getKVClient().get(sequence, option); GetResponse response; try { - response = future.get(DEFALUT_TIMEOUT, TimeUnit.SECONDS); + response = future.get(DEFALUT_TIMEOUT, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { logger.error(e.getMessage(), e); throw TopoException.wrap(e.getMessage()); @@ -196,7 +196,7 @@ public ConnGetResponse get(IContext ctx, String filePath, boolean ignoreNoNode) CompletableFuture future = this.client.getKVClient().get(sequence, option); GetResponse response; try { - response = future.get(DEFALUT_TIMEOUT, TimeUnit.SECONDS); + response = future.get(DEFALUT_TIMEOUT, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { logger.error(e.getMessage(), e); throw TopoException.wrap(e.getMessage()); @@ -224,7 +224,7 @@ public List getTabletsByCell(IContext ctx, String filePath) thr CompletableFuture future = this.client.getKVClient().get(beginSequence, option); GetResponse response; try { - response = future.get(DEFALUT_TIMEOUT, TimeUnit.SECONDS); + response = future.get(DEFALUT_TIMEOUT, TimeUnit.MILLISECONDS); } catch (InterruptedException | ExecutionException | TimeoutException e) { logger.error(e.getMessage(), e); throw TopoException.wrap(e.getMessage()); @@ -270,7 +270,7 @@ public void watchSrvKeyspace(IContext ctx, String cell, String keyspace) throws CompletableFuture future = this.client.getKVClient().get(key, option); GetResponse initial; try { - initial = future.get(DEFALUT_TIMEOUT, TimeUnit.SECONDS); + initial = future.get(DEFALUT_TIMEOUT, TimeUnit.MILLISECONDS); } catch (TimeoutException | InterruptedException | ExecutionException e) { throw TopoException.wrap(e.getMessage()); } diff --git a/src/test/java/com/jd/jdbc/discovery/MockTablet.java b/src/test/java/com/jd/jdbc/discovery/MockTablet.java index b4c2c5e..1dc4e39 100644 --- a/src/test/java/com/jd/jdbc/discovery/MockTablet.java +++ b/src/test/java/com/jd/jdbc/discovery/MockTablet.java @@ -1,3 +1,21 @@ +/* +Copyright 2021 JD Project Authors. Licensed under Apache-2.0. + +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package com.jd.jdbc.discovery; import com.jd.jdbc.queryservice.CombinedQueryService; diff --git a/src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoFactoryTest.java b/src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoFactoryTest.java new file mode 100644 index 0000000..11f6989 --- /dev/null +++ b/src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoFactoryTest.java @@ -0,0 +1,50 @@ +/* +Copyright 2021 JD Project Authors. Licensed under Apache-2.0. + +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.jd.jdbc.topo.etcd2topo; + +import java.lang.reflect.Field; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +public class Etcd2TopoFactoryTest { + @Test + @Ignore + public void testTopoConnectTimeout() throws Exception { + long timeout = 50000; + System.setProperty("vtdriver.topoConnectTimeout", timeout + ""); + + new Etcd2TopoFactory(); + + Field field = Etcd2TopoFactory.class.getDeclaredField("CONNECT_TIMEOUT_LONG"); + field.setAccessible(true); + long connectTimeoutLong = (long) field.get(null); + Assert.assertEquals(timeout, connectTimeoutLong); + } + + @Test + public void testTopoConnectTimeout2() throws Exception { + new Etcd2TopoFactory(); + + Field field = Etcd2TopoFactory.class.getDeclaredField("CONNECT_TIMEOUT_LONG"); + field.setAccessible(true); + long connectTimeoutLong = (long) field.get(null); + Assert.assertEquals(5000, connectTimeoutLong); + } +} \ No newline at end of file diff --git a/src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServerTest.java b/src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServerTest.java new file mode 100644 index 0000000..59da55f --- /dev/null +++ b/src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServerTest.java @@ -0,0 +1,71 @@ +/* +Copyright 2021 JD Project Authors. Licensed under Apache-2.0. + +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.jd.jdbc.topo.etcd2topo; + +import com.jd.jdbc.context.VtContext; +import com.jd.jdbc.topo.Topo; +import com.jd.jdbc.topo.TopoServer; +import com.jd.jdbc.vitess.VitessJdbcUrlParser; +import java.lang.reflect.Field; +import java.util.List; +import java.util.Properties; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; +import testsuite.TestSuite; +import testsuite.internal.TestSuiteShardSpec; + +public class Etcd2TopoServerTest extends TestSuite { + + @Test + @Ignore + public void testTopoExecuteTimeout() throws Exception { + long timeout = 50000; + System.setProperty("vtdriver.topoExecuteTimeout", timeout + ""); + + String connectionUrl = getConnectionUrl(Driver.of(TestSuiteShardSpec.TWO_SHARDS)); + Properties prop = VitessJdbcUrlParser.parse(connectionUrl, null); + String topoServerAddress = "http://" + prop.getProperty("host") + ":" + prop.getProperty("port"); + TopoServer topoServer = Topo.getTopoServer(Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, topoServerAddress); + List cells = topoServer.getAllCells(VtContext.withCancel(VtContext.background())); + String cell = cells.get(0); + topoServer.connForCell(null, cell); + + Field field = Etcd2TopoServer.class.getDeclaredField("DEFALUT_TIMEOUT"); + field.setAccessible(true); + long defalutTimeout = (long) field.get(null); + Assert.assertEquals(timeout, defalutTimeout); + } + + @Test + public void testTopoExecuteTimeout2() throws Exception { + String connectionUrl = getConnectionUrl(Driver.of(TestSuiteShardSpec.TWO_SHARDS)); + Properties prop = VitessJdbcUrlParser.parse(connectionUrl, null); + String topoServerAddress = "http://" + prop.getProperty("host") + ":" + prop.getProperty("port"); + TopoServer topoServer = Topo.getTopoServer(Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, topoServerAddress); + List cells = topoServer.getAllCells(VtContext.withCancel(VtContext.background())); + String cell = cells.get(0); + topoServer.connForCell(null, cell); + + Field field = Etcd2TopoServer.class.getDeclaredField("DEFALUT_TIMEOUT"); + field.setAccessible(true); + long defalutTimeout = (long) field.get(null); + Assert.assertEquals(10000L, defalutTimeout); + } +} \ No newline at end of file