Skip to content

Commit

Permalink
The establishment time and execution time for accessing the topology …
Browse files Browse the repository at this point in the history
…can be configured. (#136)
  • Loading branch information
wangweicugw authored Sep 9, 2023
1 parent 0d35e65 commit a205ef7
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 13 deletions.
20 changes: 15 additions & 5 deletions docs/properties.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,18 @@
### VtDriver中的系统参数
通过JVM参数方式传入,比如-Dvtdriver.api.port=9999

| 属性 | 数据类型 | 默认值 | 备注 |
|---|---|---|---|
| vtdriver.api.port | int | 15002 | 指定开启的http端口 |
| vtdriver.monitor.port | int | 15001 | 指定开启的http端口(prometheus) |
| vtdriver.secondsBehindMaster | int | 7200 | 指定HealthCheck中判定tablet为可用的最大主从延迟,默认值为7200s |
| 属性 | 数据类型 | 默认值 | 备注 |
|---|---|-------------------------------------------------------------------------------------|--------------------------------------------|
| vtdriver.api.port | int | 15002 | 指定开启的http端口 |
| vtdriver.monitor.port | int | 15001 | 指定开启的http端口(prometheus) |
| vtdriver.secondsBehindMaster | int | 7200 | 指定HealthCheck中判定tablet为可用的最大主从延迟,默认值为7200s |
| vtdriver.queryCoreSize | int | jdk1.8.0_131以前的版本默认值为8,之后的版本的默认值根据应用容器的cpu核数来设定,核数小于8取8,核数大于32取32,核数在8-32之间取应用cpu核数 | 执行SQL线程池核心线程数 |
| vtdriver.queryMaximumSize | int | 100 | 执行SQL线程池最大线程数 |
| vtdriver.queryQueueSize | int | 1000 | 执行SQL线程池任务队列长度 |
| vtdriver.queryRejectedTimeout | long | 3000 | 执行SQL线程池拒绝任务丢弃超时(毫秒) |
| vtdriver.healthCheckCoreSize | int | 10 | healthCheck线程池核心线程数 |
| vtdriver.healthCheckMaximumSize | int | 100 | healthCheck线程池最大线程数 |
| vtdriver.healthCheckQueueSize | int | 10000 | healthCheck线程池任务队列长度 |
| vtdriver.healthCheckRejectedTimeout | long | 3000 | healthCheck线程池拒绝任务丢弃超时(毫秒) |
| vtdriver.topoExecuteTimeout | long | 10000 | 访问拓扑元数据的执行超时时间(毫秒) |
| vtdriver.topoConnectTimeout | long | 5000 | 访问拓扑元数据连接超时时间(毫秒) |
12 changes: 9 additions & 3 deletions src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@

public class Etcd2TopoFactory implements TopoFactory {

private static final Duration KEEPALIVE_DURATION = Duration.ofSeconds(10L);

private static final long CONNECT_TIMEOUT_LONG = Long.getLong("vtdriver.topoConnectTimeout", 5000L);

private static final Duration CONNECT_DURATION = Duration.ofMillis(CONNECT_TIMEOUT_LONG);

private String clientCertPath = "";

private String clientKeyPath = "";
Expand Down Expand Up @@ -90,10 +96,10 @@ private Etcd2TopoServer newServerWithOpts(String serverAddr, String root, String
}
List<URI> endpoints = Util.toURIs(Arrays.asList(result));
Client client = Client.builder().endpoints(endpoints)
.keepaliveTimeout(Duration.ofSeconds(10L))
.keepaliveTime(Duration.ofSeconds(10L))
.keepaliveTimeout(KEEPALIVE_DURATION)
.keepaliveTime(KEEPALIVE_DURATION)
.keepaliveWithoutCalls(true)
.connectTimeout(Duration.ofSeconds(5))
.connectTimeout(CONNECT_DURATION)
.waitForReady(false)
.executorService(TabletNettyExecutorService.getNettyExecutorService()).build();

Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class Etcd2TopoServer implements TopoConnection {

private static final String END_TAG_OF_RANGE_SEARCH = "1";

private static final long DEFALUT_TIMEOUT = 10L;
private static final long DEFALUT_TIMEOUT = Long.getLong("vtdriver.topoExecuteTimeout", 10000L);

private static final ConcurrentMap<String, Watch.Watcher> WATCHER_MAP = new ConcurrentHashMap<>(16);

Expand Down Expand Up @@ -86,7 +86,7 @@ public List<DirEntry> listDir(IContext ctx, String dirPath, boolean isFull, bool
CompletableFuture<GetResponse> future = this.client.getKVClient().get(sequence, option);
GetResponse response;
try {
response = future.get(DEFALUT_TIMEOUT, TimeUnit.SECONDS);
response = future.get(DEFALUT_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.error(e.getMessage(), e);
throw TopoException.wrap(e.getMessage());
Expand Down Expand Up @@ -196,7 +196,7 @@ public ConnGetResponse get(IContext ctx, String filePath, boolean ignoreNoNode)
CompletableFuture<GetResponse> future = this.client.getKVClient().get(sequence, option);
GetResponse response;
try {
response = future.get(DEFALUT_TIMEOUT, TimeUnit.SECONDS);
response = future.get(DEFALUT_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.error(e.getMessage(), e);
throw TopoException.wrap(e.getMessage());
Expand Down Expand Up @@ -224,7 +224,7 @@ public List<ConnGetResponse> getTabletsByCell(IContext ctx, String filePath) thr
CompletableFuture<GetResponse> future = this.client.getKVClient().get(beginSequence, option);
GetResponse response;
try {
response = future.get(DEFALUT_TIMEOUT, TimeUnit.SECONDS);
response = future.get(DEFALUT_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.error(e.getMessage(), e);
throw TopoException.wrap(e.getMessage());
Expand Down Expand Up @@ -270,7 +270,7 @@ public void watchSrvKeyspace(IContext ctx, String cell, String keyspace) throws
CompletableFuture<GetResponse> future = this.client.getKVClient().get(key, option);
GetResponse initial;
try {
initial = future.get(DEFALUT_TIMEOUT, TimeUnit.SECONDS);
initial = future.get(DEFALUT_TIMEOUT, TimeUnit.MILLISECONDS);
} catch (TimeoutException | InterruptedException | ExecutionException e) {
throw TopoException.wrap(e.getMessage());
}
Expand Down
18 changes: 18 additions & 0 deletions src/test/java/com/jd/jdbc/discovery/MockTablet.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
Copyright 2021 JD Project Authors. Licensed under Apache-2.0.
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.jd.jdbc.discovery;

import com.jd.jdbc.queryservice.CombinedQueryService;
Expand Down
50 changes: 50 additions & 0 deletions src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoFactoryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
Copyright 2021 JD Project Authors. Licensed under Apache-2.0.
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.jd.jdbc.topo.etcd2topo;

import java.lang.reflect.Field;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

public class Etcd2TopoFactoryTest {
@Test
@Ignore
public void testTopoConnectTimeout() throws Exception {
long timeout = 50000;
System.setProperty("vtdriver.topoConnectTimeout", timeout + "");

new Etcd2TopoFactory();

Field field = Etcd2TopoFactory.class.getDeclaredField("CONNECT_TIMEOUT_LONG");
field.setAccessible(true);
long connectTimeoutLong = (long) field.get(null);
Assert.assertEquals(timeout, connectTimeoutLong);
}

@Test
public void testTopoConnectTimeout2() throws Exception {
new Etcd2TopoFactory();

Field field = Etcd2TopoFactory.class.getDeclaredField("CONNECT_TIMEOUT_LONG");
field.setAccessible(true);
long connectTimeoutLong = (long) field.get(null);
Assert.assertEquals(5000, connectTimeoutLong);
}
}
71 changes: 71 additions & 0 deletions src/test/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright 2021 JD Project Authors. Licensed under Apache-2.0.
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.jd.jdbc.topo.etcd2topo;

import com.jd.jdbc.context.VtContext;
import com.jd.jdbc.topo.Topo;
import com.jd.jdbc.topo.TopoServer;
import com.jd.jdbc.vitess.VitessJdbcUrlParser;
import java.lang.reflect.Field;
import java.util.List;
import java.util.Properties;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import testsuite.TestSuite;
import testsuite.internal.TestSuiteShardSpec;

public class Etcd2TopoServerTest extends TestSuite {

@Test
@Ignore
public void testTopoExecuteTimeout() throws Exception {
long timeout = 50000;
System.setProperty("vtdriver.topoExecuteTimeout", timeout + "");

String connectionUrl = getConnectionUrl(Driver.of(TestSuiteShardSpec.TWO_SHARDS));
Properties prop = VitessJdbcUrlParser.parse(connectionUrl, null);
String topoServerAddress = "http://" + prop.getProperty("host") + ":" + prop.getProperty("port");
TopoServer topoServer = Topo.getTopoServer(Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, topoServerAddress);
List<String> cells = topoServer.getAllCells(VtContext.withCancel(VtContext.background()));
String cell = cells.get(0);
topoServer.connForCell(null, cell);

Field field = Etcd2TopoServer.class.getDeclaredField("DEFALUT_TIMEOUT");
field.setAccessible(true);
long defalutTimeout = (long) field.get(null);
Assert.assertEquals(timeout, defalutTimeout);
}

@Test
public void testTopoExecuteTimeout2() throws Exception {
String connectionUrl = getConnectionUrl(Driver.of(TestSuiteShardSpec.TWO_SHARDS));
Properties prop = VitessJdbcUrlParser.parse(connectionUrl, null);
String topoServerAddress = "http://" + prop.getProperty("host") + ":" + prop.getProperty("port");
TopoServer topoServer = Topo.getTopoServer(Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, topoServerAddress);
List<String> cells = topoServer.getAllCells(VtContext.withCancel(VtContext.background()));
String cell = cells.get(0);
topoServer.connForCell(null, cell);

Field field = Etcd2TopoServer.class.getDeclaredField("DEFALUT_TIMEOUT");
field.setAccessible(true);
long defalutTimeout = (long) field.get(null);
Assert.assertEquals(10000L, defalutTimeout);
}
}

0 comments on commit a205ef7

Please sign in to comment.