From 94acdafda616889087f645e5e1f2e7a63253d700 Mon Sep 17 00:00:00 2001
From: Jin-2019 <64535472+Jin-2019@users.noreply.github.com>
Date: Fri, 11 Aug 2023 17:34:30 +0800
Subject: [PATCH] read master where slava not surviving (#128)
---
docs/properties.md | 22 +-
pom.xml | 7 +
.../java/com/jd/jdbc/common/Constant.java | 2 +
.../com/jd/jdbc/discovery/HealthCheck.java | 20 ++
.../queryservice/RetryTabletQueryService.java | 46 +--
.../com/jd/jdbc/queryservice/RoleType.java | 54 +++
.../jd/jdbc/queryservice/util/RoleUtils.java | 25 +-
.../com/jd/jdbc/vitess/VitessConnection.java | 8 +-
.../jd/jdbc/vitess/VitessJdbcProperyUtil.java | 1 +
.../com/jd/jdbc/vitess/VitessStatement.java | 1 -
.../jd/jdbc/discovery/HealthCheckTest.java | 68 ++--
.../jd/jdbc/discovery/ReadWriteSplitTest.java | 310 ++++++++++++++++++
.../jd/jdbc/queryservice/MockQueryServer.java | 7 +-
.../jdbc/queryservice/util/RoleUtilsTest.java | 63 ++++
.../vitess/VitessDriverReadWriteSplit.java | 79 ++++-
src/test/java/testsuite/TestSuite.java | 8 +
16 files changed, 632 insertions(+), 89 deletions(-)
create mode 100644 src/main/java/com/jd/jdbc/queryservice/RoleType.java
create mode 100644 src/test/java/com/jd/jdbc/discovery/ReadWriteSplitTest.java
create mode 100644 src/test/java/com/jd/jdbc/queryservice/util/RoleUtilsTest.java
diff --git a/docs/properties.md b/docs/properties.md
index 7d05da3..1ee479b 100644
--- a/docs/properties.md
+++ b/docs/properties.md
@@ -6,10 +6,9 @@
| 属性 | 数据类型 | 默认值 | 备注 |
|---|---|---|---|
-| cell | String | | 应用接入时,传入的cell需考虑跨机房切换,传入多个 |
+| cell | String | | 应用接入时,传入的cell需考虑跨机房切换,传入多个 |
| deepPaginationThreshold | int | 1000000000 | 用来设置深度分页优化的临界值,超过此参数大小会开启深度分页优化 |
-| role | String | rw | 用来配置读写分离,默认role=rw,role=rr时只读 |
-| role | String | | role=rr时优先读取replica节点,role=ro时读取rdonly节点 |
+| role | String | rw | 用来配置读写分离,默认role=rw
role=rr 优先读replica,replica不可用时读rdonly
role=rrm 优先读replica,replica不可用时读rdonly,rdonly不可用时读取master
role=ro 读rdonly,rdonly不可用时报错。需要注意的是,rdonly节点一般用于大数据抽数和备份,OOM风险高于其他节点|
| vtPlanCacheCapacity | int | 300 | 该参数用来设置执行计划缓存cache大小,最大值10240 |
| queryConsolidator | boolean | false | 用来开启Consolidator,仅在role=rr场景生效;相同的sql语句只执行一次,其余线程等待第一次查询返回结果后返回 |
| queryParallelNum | int | 1 | 在分表场景下,执行事务外的SQL语句时每个分片上可开启的最大并发数 |
@@ -22,7 +21,7 @@
| password | String | | 连接时使用的密码。 |
| characterEncoding | String | utf8 | 是指定所处理字符的解码和编码的格式,或者说是标准。若项目的字符集和MySQL数据库字符集设置为同一字符集则url可以不加此参数。 |
| serverTimezone | String | | 设置时区 |
-| socketTimeout | int | 10000 | 查询超时时间,最小值不得小于1000,小于1000时默认设置为1000 |
+| socketTimeout | int | 10000 | 查询超时时间,最小值不得小于1000,小于1000时默认设置为1000 |
| allowMultiQueries| boolean| true| 在一条语句中,允许使用“;”来分隔多条查询。不可更改,VtDriver强制设置为true|
| maxAllowedPacket | byte | 65535(64k) | 设置server接受的数据包的大小 |
| zeroDateTimeBehavior | String | exception | JAVA连接MySQL数据库,在操作值为0的timestamp类型时不能正确的处理,而是默认抛出一个异常。参数,exception:默认值;convertToNull:将日期转换成NULL值;round:替换成最近的日期 |
@@ -34,15 +33,12 @@
| connectTimeout | long | 0 | 套接字连接的超时(单位为毫秒),0表示无超时 |
| useSSL | boolean | false | 在与服务器通信时使用SSL |
| useAffectedRows | boolean | false | 当连接到服务器时不要设置“client_found_rows”标签 (这个是不符合JDBC标准的,它会破坏大部分依赖“found”VS DML语句下的”affected”应用程序)。但是会导致“insert”里面的“Correct”更新数据。服务器会返回“ON Duplicate Key update”的状态 |
-| rewriteBatchedStatements | boolean | false | 针对Statement.executeBatch(), 是否使用MultiQuery方式执行。在分表场景下,由于分表底层已开启MultiQuery,不能开启这个参数 |
+| rewriteBatchedStatements | boolean | false | 针对Statement.executeBatch(), 是否使用MultiQuery方式执行。在分表场景下,由于分表底层已开启MultiQuery,不能开启这个参数 |
##### 3.线程池参数(内部线程池仅以第一次创建Connection的参数为准)
| 属性 | 数据类型 | 默认值 | 备注 |
|---|---|---|---|
-| daemonCoreSize | int | 10 | Daemon线程池核心线程数 |
-| daemonMaximumSize | int | 100 | Daemon线程池最大线程数 |
-| daemonRejectedTimeout | long | 3000 | Daemon线程池拒绝任务丢弃超时(毫秒) |
| queryCoreSize | int | 当前cpu核心线程数 | 执行SQL线程池核心线程数 |
| queryMaximumSize | int | 100 | 执行SQL线程池最大线程数 |
| queryQueueSize | int | 1000 | 执行SQL线程池任务队列长度 |
@@ -65,12 +61,12 @@
| vtMinimumIdle | int | 5 | 最小连接数(初始连接数) |
| vtValidationTimeout | long | 5000 | 此属性控制连接测试活动的最长时间。该值必须小于connectionTimeout。最低可接受的验证超时为250毫秒。 |
-*注*:`vtMinimumIdle`和`vtMaximumPoolSize`两个参数同时未指定且分片数大于等于8时, 默认值有所不同:
+*注*:`vtMinimumIdle`和`vtMaximumPoolSize`两个参数同时未指定且分片数大于等于8时, 默认值有所不同:
-- 分片数>=8且<16时, `vtMinimumIdle` / `vtMaximumPoolSize` = 4 / 8。
-- 分片数>=16且<32时, `vtMinimumIdle` / `vtMaximumPoolSize` = 3 / 6。
-- 分片数>=32且<64时, `vtMinimumIdle` / `vtMaximumPoolSize` = 2 / 5。
-- 分片数>=64时, `vtMinimumIdle` / `vtMaximumPoolSize` = 2 / 4。
+- 分片数>=8且<16时, `vtMinimumIdle` / `vtMaximumPoolSize` = 4 / 8。
+- 分片数>=16且<32时, `vtMinimumIdle` / `vtMaximumPoolSize` = 3 / 6。
+- 分片数>=32且<64时, `vtMinimumIdle` / `vtMaximumPoolSize` = 2 / 5。
+- 分片数>=64时, `vtMinimumIdle` / `vtMaximumPoolSize` = 2 / 4。
### VtDriver中的系统参数
通过JVM参数方式传入,比如-Dvtdriver.api.port=9999
diff --git a/pom.xml b/pom.xml
index f87f4ff..014b0c1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -59,6 +59,7 @@
3.2.10
1.45.1
1.97.1
+ 3.2.2
@@ -228,6 +229,12 @@
${jackson.version}
test
+
+ commons-collections
+ commons-collections
+ ${commons-collections.version}
+ test
+
diff --git a/src/main/java/com/jd/jdbc/common/Constant.java b/src/main/java/com/jd/jdbc/common/Constant.java
index 9cd9911..64718ff 100644
--- a/src/main/java/com/jd/jdbc/common/Constant.java
+++ b/src/main/java/com/jd/jdbc/common/Constant.java
@@ -35,6 +35,8 @@ public class Constant {
public static final String DRIVER_PROPERTY_ROLE_RO = "ro";
+ public static final String DRIVER_PROPERTY_ROLE_RRM = "rrm";
+
public static final String DRIVER_PROPERTY_SCHEMA = "schema";
public static final String DRIVER_PROPERTY_QUERY_CONSOLIDATOR = "queryConsolidator";
diff --git a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java
index cda776a..b645c6f 100644
--- a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java
+++ b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java
@@ -24,6 +24,7 @@
import com.jd.jdbc.monitor.HealthCheckCollector;
import com.jd.jdbc.pool.StatefulConnectionPool;
import com.jd.jdbc.queryservice.IQueryService;
+import com.jd.jdbc.queryservice.RoleType;
import com.jd.jdbc.sqlparser.support.logging.Log;
import com.jd.jdbc.sqlparser.support.logging.LogFactory;
import com.jd.jdbc.sqlparser.utils.StringUtils;
@@ -167,6 +168,25 @@ public List getHealthyTabletStatsMaybeStandby(Query.Target ta
return healthyTabletStats;
}
+ public List getTabletHealthChecks(final Query.Target target, final RoleType roleType) {
+ Query.Target queryTarget = target;
+ if (target.getShard().isEmpty()) {
+ queryTarget = target.toBuilder().setShard("0").build();
+ }
+ List tablets = this.getHealthyTabletStats(queryTarget);
+ if (CollectionUtils.isEmpty(tablets) && Objects.equals(Topodata.TabletType.REPLICA, queryTarget.getTabletType())) {
+ Topodata.TabletType[] tabletTypes = roleType.getTabletTypes();
+ int i = 1;
+ while (CollectionUtils.isEmpty(tablets) && i < tabletTypes.length) {
+ Topodata.TabletType tabletType = tabletTypes[i];
+ queryTarget = queryTarget.toBuilder().setTabletType(tabletType).build();
+ tablets = this.getHealthyTabletStats(queryTarget);
+ i++;
+ }
+ }
+ return tablets;
+ }
+
public List getHealthyTabletStats(Query.Target target) {
this.lock.lock();
try {
diff --git a/src/main/java/com/jd/jdbc/queryservice/RetryTabletQueryService.java b/src/main/java/com/jd/jdbc/queryservice/RetryTabletQueryService.java
index ed54318..93f677f 100644
--- a/src/main/java/com/jd/jdbc/queryservice/RetryTabletQueryService.java
+++ b/src/main/java/com/jd/jdbc/queryservice/RetryTabletQueryService.java
@@ -21,6 +21,7 @@
import com.jd.jdbc.context.IContext;
import com.jd.jdbc.discovery.HealthCheck;
import com.jd.jdbc.discovery.TabletHealthCheck;
+import com.jd.jdbc.queryservice.util.RoleUtils;
import com.jd.jdbc.sqlparser.support.logging.Log;
import com.jd.jdbc.sqlparser.support.logging.LogFactory;
import com.jd.jdbc.sqltypes.BatchVtResultSet;
@@ -42,7 +43,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Set;
public class RetryTabletQueryService implements IQueryService, IHealthCheckQueryService {
@@ -78,7 +78,7 @@ public Query.CommitResponse commit(IContext context, Query.Target target, Long t
throw e;
}
}
- }).responses;
+ }, RoleUtils.getRoleType(context)).responses;
}
@Override
@@ -97,7 +97,7 @@ public Query.RollbackResponse rollback(IContext context, Query.Target target, Lo
throw e;
}
}
- }).responses;
+ }, RoleUtils.getRoleType(context)).responses;
}
@Override
@@ -107,7 +107,7 @@ public BeginVtResultSet beginExecute(IContext context, Query.Target target, List
log.debug("beginExecute --> " + sql);
}
- boolean inDedicatedConn = reservedID != 0;
+ boolean inDedicatedConn = (reservedID != 0);
return (BeginVtResultSet) retry(inDedicatedConn, target, (IQueryService qs) -> {
try {
BeginVtResultSet beginVtResultSet = qs.beginExecute(context, target, preQuries, sql, bindVariables, reservedID, options);
@@ -123,7 +123,7 @@ public BeginVtResultSet beginExecute(IContext context, Query.Target target, List
throw e;
}
}
- }).resultSetMessage;
+ }, RoleUtils.getRoleType(context)).resultSetMessage;
}
@Override
@@ -132,7 +132,7 @@ public VtResultSet execute(IContext context, Query.Target target, String sql, Ma
if (log.isDebugEnabled()) {
log.debug("execute [" + transactionID + "] --> " + sql);
}
- boolean inDedicatedConn = reservedID != 0 || transactionID != 0;
+ boolean inDedicatedConn = (reservedID != 0 || transactionID != 0);
return (VtResultSet) retry(inDedicatedConn, target, (IQueryService qs) -> {
try {
VtResultSet ret = qs.execute(context, target, sql, bindVariables, transactionID, reservedID, options);
@@ -148,7 +148,7 @@ public VtResultSet execute(IContext context, Query.Target target, String sql, Ma
throw e;
}
}
- }).resultSetMessage;
+ }, RoleUtils.getRoleType(context)).resultSetMessage;
}
@Override
@@ -185,7 +185,7 @@ public BatchVtResultSet executeBatch(IContext context, Query.Target target, List
throw e;
}
}
- }).resultSetMessage;
+ }, RoleUtils.getRoleType(context)).resultSetMessage;
}
@Override
@@ -202,7 +202,7 @@ public BeginBatchVtResultSet beginExecuteBatch(IContext context, Query.Target ta
throw e;
}
}
- }).resultSetMessage;
+ }, RoleUtils.getRoleType(context)).resultSetMessage;
}
@Override
@@ -230,7 +230,7 @@ public Query.ReleaseResponse release(IContext context, Query.Target target, Long
return (Query.ReleaseResponse) retry(inDedicatedConn, target, (IQueryService qs) -> {
Query.ReleaseResponse ret = qs.release(context, target, transactionID, reservedID);
return new IInner.InnerResult(false, ret);
- }).responses;
+ }, RoleUtils.getRoleType(context)).responses;
}
@Override
@@ -242,7 +242,7 @@ private boolean canRetry(IContext context, Exception e) {
return !context.isDone() && (e instanceof SQLRecoverableException);
}
- private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IInner inner) throws SQLException {
+ private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IInner inner, RoleType roleType) throws SQLException {
if (inTransaction && target.getTabletType() != Topodata.TabletType.MASTER) {
throw new SQLException("gateway's query service can only be used for non-transactional queries on replica and rdonly");
@@ -251,7 +251,7 @@ private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IIn
Set invalidTablets = new HashSet<>();
for (int i = 0; i < RETRY_COUNT + 1; i++) {
- List typeTablets = getTabletHealthChecks(target);
+ List typeTablets = ((TabletGateway) gateway).getHc().getTabletHealthChecks(target, roleType);
if (typeTablets == null || typeTablets.size() == 0) {
throw new SQLException("target: " + HealthCheck.keyFromTarget(target) + ". no valid tablet");
@@ -272,10 +272,12 @@ private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IIn
}
}
+ // not used for replica till now, so use a simple shuffle().
TabletHealthCheck tablet = getRandomTablet(tablets);
if (tablet == null) {
throw new SQLException("no available connection");
}
+
if (tablet.getQueryService() == null) {
invalidTablets.add(TopoProto.tabletAliasString(tablet.getTablet().getAlias()));
continue;
@@ -296,28 +298,10 @@ private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IIn
throw new SQLException("target: " + HealthCheck.keyFromTarget(target) + ". all tablets are tried, no available connection");
}
- private List getTabletHealthChecks(Query.Target target) {
- if (target.getShard().isEmpty()) {
- target = target.toBuilder().setShard("0").build();
- }
- List typeTablets;
- if (Objects.equals(Topodata.TabletType.REPLICA, target.getTabletType())) {
- typeTablets = ((TabletGateway) gateway).getHc().getHealthyTabletStats(target);
- if (typeTablets == null || typeTablets.isEmpty()) {
- typeTablets = ((TabletGateway) gateway).getHc().getHealthyTabletStats(target.toBuilder().setTabletType(Topodata.TabletType.RDONLY).build());
- }
- } else {
- typeTablets = ((TabletGateway) gateway).getHc().getHealthyTabletStats(target);
- }
- return typeTablets;
- }
-
private TabletHealthCheck getRandomTablet(List tablets) {
- TabletHealthCheck tablet;
if (tablets.size() > 1) {
Collections.shuffle(tablets);
}
- tablet = tablets.get(0);
- return tablet;
+ return tablets.get(0);
}
}
\ No newline at end of file
diff --git a/src/main/java/com/jd/jdbc/queryservice/RoleType.java b/src/main/java/com/jd/jdbc/queryservice/RoleType.java
new file mode 100644
index 0000000..da2a971
--- /dev/null
+++ b/src/main/java/com/jd/jdbc/queryservice/RoleType.java
@@ -0,0 +1,54 @@
+/*
+Copyright 2021 JD Project Authors. Licensed under Apache-2.0.
+
+Copyright 2019 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package com.jd.jdbc.queryservice;
+
+import io.vitess.proto.Topodata;
+import java.util.Arrays;
+import lombok.Getter;
+
+public class RoleType {
+
+ @Getter
+ private final Topodata.TabletType[] tabletTypes;
+
+ public RoleType(Topodata.TabletType... tabletTypes) {
+ this.tabletTypes = tabletTypes;
+ }
+
+ public Topodata.TabletType getTargetTabletType() {
+ return tabletTypes[0];
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RoleType roleType = (RoleType) o;
+ return Arrays.equals(tabletTypes, roleType.tabletTypes);
+ }
+
+ @Override
+ public int hashCode() {
+ return Arrays.hashCode(tabletTypes);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/jd/jdbc/queryservice/util/RoleUtils.java b/src/main/java/com/jd/jdbc/queryservice/util/RoleUtils.java
index 5f3ab51..7c3bc00 100644
--- a/src/main/java/com/jd/jdbc/queryservice/util/RoleUtils.java
+++ b/src/main/java/com/jd/jdbc/queryservice/util/RoleUtils.java
@@ -18,14 +18,37 @@
import com.jd.jdbc.common.Constant;
import com.jd.jdbc.context.IContext;
+import com.jd.jdbc.queryservice.RoleType;
import io.vitess.proto.Topodata;
+import java.sql.SQLException;
public class RoleUtils {
+
public static boolean notMaster(IContext ctx) {
return getTabletType(ctx) != Topodata.TabletType.MASTER;
}
public static Topodata.TabletType getTabletType(IContext ctx) {
- return (Topodata.TabletType) ctx.getContextValue(Constant.DRIVER_PROPERTY_ROLE_KEY);
+ RoleType roleType = getRoleType(ctx);
+ return roleType.getTargetTabletType();
+ }
+
+ public static RoleType getRoleType(IContext ctx) {
+ return (RoleType) ctx.getContextValue(Constant.DRIVER_PROPERTY_ROLE_KEY);
+ }
+
+ public static RoleType buildRoleType(String role) throws SQLException {
+ switch (role.toLowerCase()) {
+ case Constant.DRIVER_PROPERTY_ROLE_RW:
+ return new RoleType(Topodata.TabletType.MASTER);
+ case Constant.DRIVER_PROPERTY_ROLE_RR:
+ return new RoleType(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY);
+ case Constant.DRIVER_PROPERTY_ROLE_RO:
+ return new RoleType(Topodata.TabletType.RDONLY);
+ case Constant.DRIVER_PROPERTY_ROLE_RRM:
+ return new RoleType(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY, Topodata.TabletType.MASTER);
+ default:
+ throw new SQLException("'role=" + role + "' " + "error in jdbc url");
+ }
}
}
diff --git a/src/main/java/com/jd/jdbc/vitess/VitessConnection.java b/src/main/java/com/jd/jdbc/vitess/VitessConnection.java
index 2493523..64ab864 100755
--- a/src/main/java/com/jd/jdbc/vitess/VitessConnection.java
+++ b/src/main/java/com/jd/jdbc/vitess/VitessConnection.java
@@ -26,6 +26,7 @@
import com.jd.jdbc.pool.InnerConnection;
import com.jd.jdbc.pool.StatefulConnectionPool;
import com.jd.jdbc.queryservice.IParentQueryService;
+import com.jd.jdbc.queryservice.RoleType;
import com.jd.jdbc.queryservice.TabletDialer;
import com.jd.jdbc.queryservice.util.RoleUtils;
import com.jd.jdbc.session.SafeSession;
@@ -117,7 +118,7 @@ public VitessConnection(String url, Properties prop, TopoServer topoServer, Reso
this.executor = com.jd.jdbc.Executor.getInstance(Utils.getInteger(prop, "vtPlanCacheCapacity"));
this.vm = vSchemaManager;
this.ctx = VtContext.withCancel(VtContext.background());
- this.ctx.setContextValue(Constant.DRIVER_PROPERTY_ROLE_KEY, VitessJdbcProperyUtil.getTabletType(prop));
+ this.ctx.setContextValue(Constant.DRIVER_PROPERTY_ROLE_KEY, getRoleType(prop));
this.ctx.setContextValue(ContextKey.CTX_TOPOSERVER, topoServer);
this.ctx.setContextValue(ContextKey.CTX_SCATTER_CONN, resolver.getScatterConn());
this.ctx.setContextValue(ContextKey.CTX_TX_CONN, resolver.getScatterConn().getTxConn());
@@ -495,6 +496,11 @@ public List getActualTables(String logicTableName) {
return actualTables;
}
+ private RoleType getRoleType(Properties prop) throws SQLException {
+ String role = prop.getProperty(Constant.DRIVER_PROPERTY_ROLE_KEY, Constant.DRIVER_PROPERTY_ROLE_RW);
+ return RoleUtils.buildRoleType(role);
+ }
+
public enum ContextKey {
CTX_TOPOSERVER,
CTX_SCATTER_CONN,
diff --git a/src/main/java/com/jd/jdbc/vitess/VitessJdbcProperyUtil.java b/src/main/java/com/jd/jdbc/vitess/VitessJdbcProperyUtil.java
index 0d37d66..6b22654 100644
--- a/src/main/java/com/jd/jdbc/vitess/VitessJdbcProperyUtil.java
+++ b/src/main/java/com/jd/jdbc/vitess/VitessJdbcProperyUtil.java
@@ -118,6 +118,7 @@ public static Topodata.TabletType getTabletType(Properties props) throws SQLExce
case Constant.DRIVER_PROPERTY_ROLE_RW:
return Topodata.TabletType.MASTER;
case Constant.DRIVER_PROPERTY_ROLE_RR:
+ case Constant.DRIVER_PROPERTY_ROLE_RRM:
return Topodata.TabletType.REPLICA;
case Constant.DRIVER_PROPERTY_ROLE_RO:
return Topodata.TabletType.RDONLY;
diff --git a/src/main/java/com/jd/jdbc/vitess/VitessStatement.java b/src/main/java/com/jd/jdbc/vitess/VitessStatement.java
index 9d2522f..0e5d817 100755
--- a/src/main/java/com/jd/jdbc/vitess/VitessStatement.java
+++ b/src/main/java/com/jd/jdbc/vitess/VitessStatement.java
@@ -72,7 +72,6 @@
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
diff --git a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java
index f88d41b..212609d 100644
--- a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java
+++ b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java
@@ -85,7 +85,6 @@ public static void initPool() {
HealthCheck.resetHealthCheck();
TabletDialerAgent.clearTabletCache();
TopologyWatcherManager.INSTANCE.close();
-
VtHealthCheckExecutorService.initialize(null, null, null, null);
VtQueryExecutorService.initialize(null, null, null, null);
}
@@ -116,7 +115,7 @@ public void resetHealthCheck() {
*/
@Test
- public void testHealthCheck() throws InterruptedException, IOException {
+ public void testHealthCheck() throws InterruptedException {
printComment("1. HealthCheck Test");
printComment("a. Get Health");
@@ -155,7 +154,7 @@ public void testHealthCheck() throws InterruptedException, IOException {
* 3. testing if tablet can be remove from healthy when receive error message from tablet server
*/
@Test
- public void testHealthCheckStreamError() throws IOException, InterruptedException {
+ public void testHealthCheckStreamError() throws InterruptedException {
printComment("2. HealthCheck Test for Error Stream Message");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -196,7 +195,7 @@ public void testHealthCheckStreamError() throws IOException, InterruptedExceptio
* 3. testing if changing the type of tablet to primary;
*/
@Test
- public void testHealthCheckExternalReparent() throws IOException, InterruptedException {
+ public void testHealthCheckExternalReparent() throws InterruptedException {
printComment("3. HealthCheck Test one tablet External Reparent");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -242,7 +241,7 @@ public void testHealthCheckExternalReparent() throws IOException, InterruptedExc
}
@Test
- public void testHealthCheckTwoExternalReparent() throws IOException, InterruptedException {
+ public void testHealthCheckTwoExternalReparent() throws InterruptedException {
printComment("4. HealthCheck Test two tablets External Reparent");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -296,7 +295,7 @@ public void testHealthCheckTwoExternalReparent() throws IOException, Interrupted
}
@Test
- public void testHealthCheckVerifiesTabletAlias() throws IOException, InterruptedException {
+ public void testHealthCheckVerifiesTabletAlias() throws InterruptedException {
printComment("5. HealthCheck Test receive a mismatch tablet info");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -325,7 +324,7 @@ public void testHealthCheckVerifiesTabletAlias() throws IOException, Interrupted
}
@Test
- public void testHealthCheckRemoveTabletAfterReparent() throws IOException, InterruptedException {
+ public void testHealthCheckRemoveTabletAfterReparent() throws InterruptedException {
printComment("6. HealthCheck Test remove tablet after reparent");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -378,7 +377,7 @@ public void testHealthCheckRemoveTabletAfterReparent() throws IOException, Inter
}
@Test
- public void testHealthCheckOnNextBeforeRemove() throws IOException, InterruptedException {
+ public void testHealthCheckOnNextBeforeRemove() throws InterruptedException {
printComment("6a. HealthCheck Test onNext before remove tablet");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -413,7 +412,7 @@ public void testHealthCheckOnNextBeforeRemove() throws IOException, InterruptedE
}
@Test
- public void testHealthCheckOnNextAfterRemove() throws IOException, InterruptedException {
+ public void testHealthCheckOnNextAfterRemove() throws InterruptedException {
printComment("6b. HealthCheck Test onNext after remove tablet");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -448,7 +447,7 @@ public void testHealthCheckOnNextAfterRemove() throws IOException, InterruptedEx
}
@Test
- public void testHealthCheckTimeout() throws IOException, InterruptedException {
+ public void testHealthCheckTimeout() throws InterruptedException {
printComment("7. HealthCheck Test when health check timeout");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -495,7 +494,7 @@ public void testHealthCheckTimeout() throws IOException, InterruptedException {
* @throws InterruptedException
*/
@Test
- public void testGetHealthyTablet() throws IOException, InterruptedException {
+ public void testGetHealthyTablet() throws InterruptedException {
printComment("9. HealthCheck Test the functionality of getHealthyTabletStats");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -628,7 +627,7 @@ public void testGetHealthyTablet() throws IOException, InterruptedException {
}
@Test
- public void testPrimaryInOtherCell() throws TopoException, IOException, InterruptedException {
+ public void testPrimaryInOtherCell() throws TopoException, InterruptedException {
TopoServer topoServer = MemoryTopoFactory.newServerAndFactory("cell1", "cell2").getTopoServer();
startWatchTopo("k", topoServer, "cell1", "cell2");
@@ -655,7 +654,7 @@ public void testPrimaryInOtherCell() throws TopoException, IOException, Interrup
}
@Test
- public void testReplicaInOtherCell() throws TopoException, IOException, InterruptedException {
+ public void testReplicaInOtherCell() throws TopoException, InterruptedException {
TopoServer topoServer = MemoryTopoFactory.newServerAndFactory("cell1", "cell2").getTopoServer();
startWatchTopo("k", topoServer, "cell1", "cell2");
@@ -696,7 +695,7 @@ public void testReplicaInOtherCell() throws TopoException, IOException, Interrup
}
@Test
- public void testGetStandbyTablet() throws IOException, InterruptedException {
+ public void testGetStandbyTablet() throws InterruptedException {
printComment("12. HealthCheck Test get healthy tablet maybe standby");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -749,7 +748,7 @@ public void testGetStandbyTablet() throws IOException, InterruptedException {
}
@Test
- public void testUnhealthyReplicaAsSecondsBehind() throws IOException, InterruptedException {
+ public void testUnhealthyReplicaAsSecondsBehind() throws InterruptedException {
printComment("13. HealthCheck Test get healthy tablet maybe standby");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -780,7 +779,7 @@ public void testUnhealthyReplicaAsSecondsBehind() throws IOException, Interrupte
}
@Test
- public void testMysqlPort0to3358() throws IOException, InterruptedException {
+ public void testMysqlPort0to3358() throws InterruptedException {
printComment("14. HealthCheck Test in Tablet MySQL port changed from 0 to 3358");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -815,7 +814,7 @@ public void testMysqlPort0to3358() throws IOException, InterruptedException {
}
@Test
- public void testMysqlPort3358to0() throws IOException, InterruptedException {
+ public void testMysqlPort3358to0() throws InterruptedException {
printComment("15. HealthCheck Test in Tablet MySQL port changed from 3358 to 0");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -852,7 +851,7 @@ public void testMysqlPort3358to0() throws IOException, InterruptedException {
}
@Test
- public void testDoubleMaster() throws IOException, InterruptedException {
+ public void testDoubleMaster() throws InterruptedException {
printComment("16. double master one no serving");
printComment("a. Get Health");
HealthCheck hc = getHealthCheck();
@@ -884,7 +883,7 @@ public void testDoubleMaster() throws IOException, InterruptedException {
Thread.sleep(200);
printComment("e. Modify old master Tablet to no serving");
- sendOnNextMessage(mockTablet, Topodata.TabletType.MASTER, false, 0, 0.5, 0);
+ sendOnErrorMessage(mockTablet);
Thread.sleep(200);
Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size());
@@ -931,7 +930,7 @@ public void testHealthyListChecksum() {
}
@Test
- public void testHealthyChecksumSetBehindMaster() throws IOException, InterruptedException {
+ public void testHealthyChecksumSetBehindMaster() throws InterruptedException {
HealthCheck hc = getHealthCheck();
// add tablet
String keyInHealthy = "k.s.replica";
@@ -1000,7 +999,7 @@ public void testConcurrentModificationException() throws InterruptedException {
}
@Test
- public void testHealthyConcurrentModificationException() throws InterruptedException, IOException {
+ public void testHealthyConcurrentModificationException() throws InterruptedException {
HealthCheck hc = getHealthCheck();
String keyspace = "k";
@@ -1078,7 +1077,7 @@ private void closeQueryService(MockTablet... tablets) throws InterruptedExceptio
}
}
- private HealthCheck getHealthCheck() {
+ protected HealthCheck getHealthCheck() {
HealthCheck hc = HealthCheck.INSTANCE;
Assert.assertEquals(0, hc.getHealthByAliasCopy().size());
Assert.assertEquals(0, hc.getHealthyCopy().size());
@@ -1096,11 +1095,16 @@ private void assertTabletHealthCheck(TabletHealthCheck actualTabletHealthCheck,
Assert.assertEquals("Wrong realtime stats", expectStats, actualTabletHealthCheck.getStats());
}
- private MockTablet buildMockTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type) throws IOException {
+ protected MockTablet buildMockTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type) {
String serverName = InProcessServerBuilder.generateName();
BlockingQueue healthMessage = new ArrayBlockingQueue<>(2);
MockQueryServer queryServer = new MockQueryServer(healthMessage);
- Server server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start();
+ Server server = null;
+ try {
+ server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
grpcCleanup.register(server);
@@ -1142,16 +1146,24 @@ private Query.Target createTarget(Topodata.TabletType type) {
return Query.Target.newBuilder().setKeyspace("k").setShard("s").setTabletType(type).build();
}
- private void sendOnNextMessage(MockTablet mockTablet, Topodata.TabletType type, boolean isServing, int reparentedTimestamp, double cpuUsage, int secondsBehindMaster) throws InterruptedException {
+ protected void sendOnNextMessage(MockTablet mockTablet, Topodata.TabletType type, boolean isServing, int reparentedTimestamp, double cpuUsage, int secondsBehindMaster) {
Query.Target target = createTarget(type);
Query.StreamHealthResponse streamHealthResponse = createStreamHealthResponse(mockTablet.getTablet().getAlias(), target, isServing, reparentedTimestamp, cpuUsage, secondsBehindMaster);
MockQueryServer.HealthCheckMessage message = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Next, streamHealthResponse);
- mockTablet.getHealthMessage().put(message);
+ try {
+ mockTablet.getHealthMessage().put(message);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
- private void sendOnErrorMessage(MockTablet mockTablet) throws InterruptedException {
+ protected void sendOnErrorMessage(MockTablet mockTablet) {
MockQueryServer.HealthCheckMessage error = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Error, null);
- mockTablet.getHealthMessage().put(error);
+ try {
+ mockTablet.getHealthMessage().put(error);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
@AllArgsConstructor
diff --git a/src/test/java/com/jd/jdbc/discovery/ReadWriteSplitTest.java b/src/test/java/com/jd/jdbc/discovery/ReadWriteSplitTest.java
new file mode 100644
index 0000000..913f951
--- /dev/null
+++ b/src/test/java/com/jd/jdbc/discovery/ReadWriteSplitTest.java
@@ -0,0 +1,310 @@
+/*
+Copyright 2021 JD Project Authors. Licensed under Apache-2.0.
+
+Copyright 2019 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package com.jd.jdbc.discovery;
+
+import com.google.common.collect.Lists;
+import com.jd.jdbc.queryservice.CombinedQueryService;
+import com.jd.jdbc.queryservice.IParentQueryService;
+import com.jd.jdbc.queryservice.MockQueryServer;
+import com.jd.jdbc.queryservice.RoleType;
+import com.jd.jdbc.queryservice.TabletDialerAgent;
+import com.jd.jdbc.queryservice.util.RoleUtils;
+import com.jd.jdbc.util.threadpool.impl.VtHealthCheckExecutorService;
+import com.jd.jdbc.util.threadpool.impl.VtQueryExecutorService;
+import io.grpc.ManagedChannel;
+import io.grpc.Server;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import io.grpc.inprocess.InProcessServerBuilder;
+import io.grpc.testing.GrpcCleanupRule;
+import io.vitess.proto.Query;
+import io.vitess.proto.Topodata;
+import java.io.IOException;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.TimeUnit;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import testsuite.TestSuite;
+
+public class ReadWriteSplitTest extends TestSuite {
+ private static final Map PORT_MAP = new HashMap<>();
+
+ private static RoleType rrRoleType;
+
+ private static RoleType rwRoleType;
+
+ private static RoleType roRoleType;
+
+ private static RoleType rrmRoleType;
+
+ @Rule
+ public GrpcCleanupRule grpcCleanup;
+
+ static {
+ PORT_MAP.put("vt", 1);
+ PORT_MAP.put("grpc", 2);
+ try {
+ rrRoleType = RoleUtils.buildRoleType("rr");
+ rwRoleType = RoleUtils.buildRoleType("rw");
+ roRoleType = RoleUtils.buildRoleType("ro");
+ rrmRoleType = RoleUtils.buildRoleType("rrm");
+ } catch (SQLException throwables) {
+ throwables.printStackTrace();
+ }
+ }
+
+ private final String cell1 = "cell1";
+
+ private final String cell2 = "cell2";
+
+ private final String keyspace = "vtdriver2";
+
+ private final String shard = "-80";
+
+ @BeforeClass
+ public static void initPool() {
+ VtHealthCheckExecutorService.initialize(null, null, null, null);
+ VtQueryExecutorService.initialize(null, null, null, null);
+ }
+
+ @Before
+ public void init() throws IOException {
+ grpcCleanup = new GrpcCleanupRule();
+ }
+
+ @After
+ public void resetHealthCheck() {
+ HealthCheck.resetHealthCheck();
+ }
+
+ @Test
+ public void testServing() {
+ // init HealthCheck
+ HealthCheck hc = HealthCheck.INSTANCE;
+ MockTablet mockTablet1 = buildMockTablet(cell1, RandomUtils.nextInt(), "a", keyspace, shard, PORT_MAP, Topodata.TabletType.REPLICA);
+ hc.addTablet(mockTablet1.getTablet());
+ MockTablet mockTablet2 = buildMockTablet(cell2, RandomUtils.nextInt(), "b", keyspace, shard, PORT_MAP, Topodata.TabletType.RDONLY);
+ hc.addTablet(mockTablet2.getTablet());
+ MockTablet mockTablet3 = buildMockTablet(cell1, RandomUtils.nextInt(), "c", keyspace, shard, PORT_MAP, Topodata.TabletType.MASTER);
+ hc.addTablet(mockTablet3.getTablet());
+ MockTablet mockTablet4 = buildMockTablet(cell2, RandomUtils.nextInt(), "d", keyspace, shard, PORT_MAP, Topodata.TabletType.REPLICA);
+ hc.addTablet(mockTablet4.getTablet());
+
+ sleepMillisSeconds(200);
+ sendOnNextMessage(mockTablet1, mockTablet2, mockTablet3, mockTablet4);
+
+ // check HealthCheck.getTabletHealthChecks
+ List tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.REPLICA), rrRoleType);
+ Assert.assertNotNull(tabletHealthChecks);
+ Assert.assertEquals(2, tabletHealthChecks.size());
+ List tablets = new ArrayList<>();
+ for (TabletHealthCheck tabletHealthCheck : tabletHealthChecks) {
+ tablets.add(tabletHealthCheck.getTablet());
+ }
+ Assert.assertTrue(CollectionUtils.isEqualCollection(Lists.newArrayList(mockTablet1.getTablet(), mockTablet4.getTablet()), tablets));
+
+ tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.REPLICA), rrmRoleType);
+ Assert.assertNotNull(tabletHealthChecks);
+ Assert.assertEquals(2, tabletHealthChecks.size());
+ tablets = new ArrayList<>();
+ for (TabletHealthCheck tabletHealthCheck : tabletHealthChecks) {
+ tablets.add(tabletHealthCheck.getTablet());
+ }
+ Assert.assertTrue(CollectionUtils.isEqualCollection(Lists.newArrayList(mockTablet1.getTablet(), mockTablet4.getTablet()), tablets));
+
+ tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.RDONLY), roRoleType);
+ Assert.assertNotNull(tabletHealthChecks);
+ Assert.assertEquals(1, tabletHealthChecks.size());
+ Assert.assertEquals(mockTablet2.getTablet(), tabletHealthChecks.get(0).getTablet());
+
+ tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.MASTER), rwRoleType);
+ Assert.assertNotNull(tabletHealthChecks);
+ Assert.assertEquals(1, tabletHealthChecks.size());
+ Assert.assertEquals(mockTablet3.getTablet(), tabletHealthChecks.get(0).getTablet());
+
+ // close resource
+ closeQueryService(mockTablet1, mockTablet2, mockTablet3, mockTablet4);
+ }
+
+ @Test
+ public void testNoServing() {
+ // init HealthCheck
+ HealthCheck hc = HealthCheck.INSTANCE;
+ MockTablet mockTablet1 = buildMockTablet(cell2, RandomUtils.nextInt(), "e", keyspace, shard, PORT_MAP, Topodata.TabletType.REPLICA);
+ hc.addTablet(mockTablet1.getTablet());
+ MockTablet mockTablet2 = buildMockTablet(cell2, RandomUtils.nextInt(), "f", keyspace, shard, PORT_MAP, Topodata.TabletType.RDONLY);
+ hc.addTablet(mockTablet2.getTablet());
+ MockTablet mockTablet3 = buildMockTablet(cell1, RandomUtils.nextInt(), "g", keyspace, shard, PORT_MAP, Topodata.TabletType.MASTER);
+ hc.addTablet(mockTablet3.getTablet());
+ MockTablet mockTablet4 = buildMockTablet(cell2, RandomUtils.nextInt(), "h", keyspace, shard, PORT_MAP, Topodata.TabletType.REPLICA);
+ hc.addTablet(mockTablet4.getTablet());
+
+ sleepMillisSeconds(200);
+ sendOnNextMessage(mockTablet3);
+
+ // check HealthCheck.getTabletHealthChecks
+ List tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.REPLICA), rrRoleType);
+ Assert.assertTrue(CollectionUtils.isEmpty(tabletHealthChecks));
+
+ tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.RDONLY), roRoleType);
+ Assert.assertTrue(CollectionUtils.isEmpty(tabletHealthChecks));
+
+ tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.MASTER), rwRoleType);
+ Assert.assertNotNull(tabletHealthChecks);
+ Assert.assertEquals(1, tabletHealthChecks.size());
+ Assert.assertEquals(mockTablet3.getTablet(), tabletHealthChecks.get(0).getTablet());
+
+ tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.REPLICA), rrmRoleType);
+ Assert.assertNotNull(tabletHealthChecks);
+ Assert.assertEquals(1, tabletHealthChecks.size());
+ Assert.assertEquals(mockTablet3.getTablet(), tabletHealthChecks.get(0).getTablet());
+
+ // close resource
+ closeQueryService(mockTablet1, mockTablet2, mockTablet3, mockTablet4);
+ }
+
+ @Test
+ public void testNoServing2() {
+ // init HealthCheck
+ HealthCheck hc = HealthCheck.INSTANCE;
+ MockTablet mockTablet1 = buildMockTablet(cell2, RandomUtils.nextInt(), "i", keyspace, shard, PORT_MAP, Topodata.TabletType.REPLICA);
+ hc.addTablet(mockTablet1.getTablet());
+ MockTablet mockTablet2 = buildMockTablet(cell1, RandomUtils.nextInt(), "j", keyspace, shard, PORT_MAP, Topodata.TabletType.RDONLY);
+ hc.addTablet(mockTablet2.getTablet());
+ MockTablet mockTablet3 = buildMockTablet(cell2, RandomUtils.nextInt(), "k", keyspace, shard, PORT_MAP, Topodata.TabletType.MASTER);
+ hc.addTablet(mockTablet3.getTablet());
+ MockTablet mockTablet4 = buildMockTablet(cell2, RandomUtils.nextInt(), "l", keyspace, shard, PORT_MAP, Topodata.TabletType.REPLICA);
+ hc.addTablet(mockTablet4.getTablet());
+
+ sleepMillisSeconds(200);
+ sendOnNextMessage(mockTablet3, mockTablet2);
+
+ // check HealthCheck.getTabletHealthChecks
+ List tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.REPLICA), rrRoleType);
+ Assert.assertNotNull(tabletHealthChecks);
+ Assert.assertEquals(1, tabletHealthChecks.size());
+ Assert.assertEquals(mockTablet2.getTablet(), tabletHealthChecks.get(0).getTablet());
+
+ tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.RDONLY), roRoleType);
+ Assert.assertNotNull(tabletHealthChecks);
+ Assert.assertEquals(1, tabletHealthChecks.size());
+ Assert.assertEquals(mockTablet2.getTablet(), tabletHealthChecks.get(0).getTablet());
+
+ tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.REPLICA), rrmRoleType);
+ Assert.assertNotNull(tabletHealthChecks);
+ Assert.assertEquals(1, tabletHealthChecks.size());
+ Assert.assertEquals(mockTablet2.getTablet(), tabletHealthChecks.get(0).getTablet());
+
+ tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.MASTER), rwRoleType);
+ Assert.assertNotNull(tabletHealthChecks);
+ Assert.assertEquals(1, tabletHealthChecks.size());
+ Assert.assertEquals(mockTablet3.getTablet(), tabletHealthChecks.get(0).getTablet());
+
+ // close resource
+ closeQueryService(mockTablet1, mockTablet2, mockTablet3, mockTablet4);
+ }
+
+ private void sendOnNextMessage(MockTablet... mockTablets) {
+ for (MockTablet mockTablet : mockTablets) {
+ Query.Target target = createTarget(mockTablet.getTablet().getType());
+ Query.StreamHealthResponse streamHealthResponse = Query.StreamHealthResponse.newBuilder()
+ .setTabletAlias(mockTablet.getTablet().getAlias())
+ .setTarget(target)
+ .setServing(true)
+ .setTabletExternallyReparentedTimestamp(0)
+ .setRealtimeStats(Query.RealtimeStats.newBuilder().setCpuUsage(0.5).setSecondsBehindMaster(0).build())
+ .build();
+ MockQueryServer.HealthCheckMessage message = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Next, streamHealthResponse);
+ try {
+ mockTablet.getHealthMessage().put(message);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ sleepMillisSeconds(200);
+ }
+
+ protected Query.Target createTarget(Topodata.TabletType type) {
+ return Query.Target.newBuilder().setKeyspace(keyspace).setShard(shard).setTabletType(type).build();
+ }
+
+ protected MockTablet buildMockTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type) {
+ BlockingQueue healthMessage = new ArrayBlockingQueue<>(10);
+ MockQueryServer queryServer = new MockQueryServer(healthMessage);
+ Server server;
+ try {
+ server = InProcessServerBuilder.forName(hostName).directExecutor().addService(queryServer).build().start();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ grpcCleanup.register(server);
+ ManagedChannel channel = InProcessChannelBuilder.forName(hostName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS)
+ .keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build();
+ grpcCleanup.register(channel);
+ Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type);
+ IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet);
+ TabletDialerAgent.registerTabletCache(tablet, combinedQueryService);
+ return new MockTablet(tablet, healthMessage);
+ }
+
+ private Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type) {
+ Topodata.TabletAlias tabletAlias = Topodata.TabletAlias.newBuilder().setCell(cell).setUid(uid).build();
+ int defaultMysqlPort = 3358;
+ Topodata.Tablet.Builder tabletBuilder = Topodata.Tablet.newBuilder().setHostname(hostName).setAlias(tabletAlias).setKeyspace(keyspaceName)
+ .setShard(shard).setType(type).setMysqlHostname(hostName).setMysqlPort(defaultMysqlPort);
+ for (Map.Entry portEntry : portMap.entrySet()) {
+ tabletBuilder.putPortMap(portEntry.getKey(), portEntry.getValue());
+ }
+ System.out.printf("buildTablet: keyspace:%s, shard:%s, tablet_type:%s, cell:%s, uid:%s\n", keyspaceName, shard, type, cell, uid);
+ return tabletBuilder.build();
+ }
+
+ private void closeQueryService(MockTablet... tablets) {
+ MockQueryServer.HealthCheckMessage close = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Close, null);
+ for (MockTablet tablet : tablets) {
+ try {
+ tablet.getHealthMessage().put(close);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @AllArgsConstructor
+ @Getter
+ private static class MockTablet {
+
+ private final Topodata.Tablet tablet;
+
+ private final BlockingQueue healthMessage;
+ }
+}
diff --git a/src/test/java/com/jd/jdbc/queryservice/MockQueryServer.java b/src/test/java/com/jd/jdbc/queryservice/MockQueryServer.java
index 4780323..5f0da01 100644
--- a/src/test/java/com/jd/jdbc/queryservice/MockQueryServer.java
+++ b/src/test/java/com/jd/jdbc/queryservice/MockQueryServer.java
@@ -82,9 +82,7 @@ private void subscribe() {
private void notifyAll(HealthCheckMessage message) {
switch (message.getMessageType()) {
case Close:
- observers.forEach(observer -> {
- observer.onCompleted();
- });
+ observers.forEach(StreamObserver::onCompleted);
System.out.println("server: receive an ending message, complete stream connection");
observers.clear();
break;
@@ -103,7 +101,8 @@ private void notifyAll(HealthCheckMessage message) {
e.printStackTrace();
}
});
- System.out.println("server: receive message: " + message.getMessage());
+ System.out.printf("server: receive message: keyspace:%s, shard:%s, tablet_type:%s, cell:%s, uid:%s\n", message.getMessage().getTarget().getKeyspace(), message.getMessage().getTarget().getShard(),
+ message.getMessage().getTarget().getTabletType(), message.getMessage().getTabletAlias().getCell(), message.getMessage().getTabletAlias().getUid());
}
}
diff --git a/src/test/java/com/jd/jdbc/queryservice/util/RoleUtilsTest.java b/src/test/java/com/jd/jdbc/queryservice/util/RoleUtilsTest.java
new file mode 100644
index 0000000..59dd795
--- /dev/null
+++ b/src/test/java/com/jd/jdbc/queryservice/util/RoleUtilsTest.java
@@ -0,0 +1,63 @@
+/*
+Copyright 2021 JD Project Authors. Licensed under Apache-2.0.
+
+Copyright 2019 The Vitess Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package com.jd.jdbc.queryservice.util;
+
+import com.jd.jdbc.queryservice.RoleType;
+import io.vitess.proto.Topodata;
+import java.sql.SQLException;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+public class RoleUtilsTest {
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
+
+ private RoleType rw = new RoleType(Topodata.TabletType.MASTER);
+
+ private RoleType rr = new RoleType(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY);
+
+ private RoleType ro = new RoleType(Topodata.TabletType.RDONLY);
+
+ private RoleType rrm = new RoleType(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY, Topodata.TabletType.MASTER);
+
+ @Test
+ public void buildRoleTypeTest() throws SQLException {
+ RoleType rrRoleType = RoleUtils.buildRoleType("rr");
+ Assert.assertEquals(rr, rrRoleType);
+ RoleType rwRoleType = RoleUtils.buildRoleType("rw");
+ Assert.assertEquals(rw, rwRoleType);
+ RoleType roRoleType = RoleUtils.buildRoleType("ro");
+ Assert.assertEquals(ro, roRoleType);
+ RoleType rrmRoleType = RoleUtils.buildRoleType("rrm");
+ Assert.assertEquals(rrm, rrmRoleType);
+ }
+
+ @Test
+ public void buildRoleTypeErrorTest() throws SQLException {
+ String role = RandomStringUtils.random(3);
+ thrown.expect(SQLException.class);
+ thrown.expectMessage("'role=" + role + "' " + "error in jdbc url");
+ RoleType roleType = RoleUtils.buildRoleType(role);
+ Assert.fail();
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/com/jd/jdbc/vitess/VitessDriverReadWriteSplit.java b/src/test/java/com/jd/jdbc/vitess/VitessDriverReadWriteSplit.java
index 49ba3e3..9724f07 100644
--- a/src/test/java/com/jd/jdbc/vitess/VitessDriverReadWriteSplit.java
+++ b/src/test/java/com/jd/jdbc/vitess/VitessDriverReadWriteSplit.java
@@ -18,6 +18,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -26,12 +27,14 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.FixMethodOrder;
+import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
@@ -52,7 +55,7 @@ public class VitessDriverReadWriteSplit extends TestSuite {
protected Connection roConnection;
- protected Connection rtConnection;
+ protected Connection rrmConnection;
@Before
public void init() throws Exception {
@@ -61,16 +64,12 @@ public void init() throws Exception {
rrConnection = DriverManager.getConnection(baseUrl + "&role=rr");
rwConnection = DriverManager.getConnection(baseUrl + "&role=rw");
roConnection = DriverManager.getConnection(baseUrl + "&role=ro");
+ rrmConnection = DriverManager.getConnection(baseUrl + "&role=rrm");
}
@After
public void close() throws SQLException {
- if (rrConnection != null) {
- rrConnection.close();
- }
- if (rwConnection != null) {
- rwConnection.close();
- }
+ closeConnection(rrConnection, rwConnection, roConnection, rrmConnection);
}
@Test
@@ -188,17 +187,57 @@ public void test12() throws SQLException {
public void test13() throws SQLException {
thrown.expect(SQLException.class);
thrown.expectMessage("error in jdbc url");
- rtConnection = DriverManager.getConnection(baseUrl + "&role=rt");
+ DriverManager.getConnection(baseUrl + "&role=rt");
+ }
+
+ @Test
+ public void test14() throws SQLException {
+ thrown.expect(SQLException.class);
+ thrown.expectMessage("is not allowed for read only connection");
+ try (Statement stmt = rrmConnection.createStatement()) {
+ stmt.executeUpdate("delete from test");
+ }
+ }
+
+ @Test
+ public void test15() throws SQLException {
+ thrown.expect(SQLException.class);
+ thrown.expectMessage("is not allowed for read only connection");
+ try (Statement stmt = rrmConnection.createStatement()) {
+ stmt.executeUpdate("insert into test (f_tinyint,f_int) values(1,2)");
+ }
+ }
+
+ @Test
+ public void test16() throws SQLException {
+ thrown.expect(SQLException.class);
+ thrown.expectMessage("is not allowed for read only connection");
+ try (Statement stmt = rrmConnection.createStatement()) {
+ stmt.executeUpdate("update test set f_int = 100 where f_tinyint = 1");
+ }
+ }
+
+ @Test
+ public void test17() throws SQLException {
+ sleep(10);
+ try (Statement stmt = rrmConnection.createStatement()) {
+ ResultSet resultSet;
+ resultSet = stmt.executeQuery("select f_tinyint,f_int from test where f_tinyint = 1");
+ while (resultSet.next()) {
+ Assert.assertEquals(100, resultSet.getInt(2));
+ }
+ }
}
@Test
public void concurrencyTest() throws InterruptedException {
// 交替使用不同账户执行sql
- final Connection[] conns = new Connection[] {rwConnection, rrConnection, roConnection};
+ final Connection[] conns = new Connection[] {rwConnection, rrConnection, roConnection, rrmConnection};
final String[] sqls = new String[] {
"insert into user (id, name) values (null, '%s')",
"select id, name from user limit 1",
"select id, name from user limit 2",
+ "select id, name from user limit 3",
};
final int numThreads = 4;
final int count = 200;
@@ -207,12 +246,13 @@ public void concurrencyTest() throws InterruptedException {
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
+ AtomicBoolean flag = new AtomicBoolean(true);
CountDownLatch latch = new CountDownLatch(numThreads);
for (int i = 0; i < numThreads; i++) {
final int fi = i;
service.execute(() -> {
for (int k = 0; k < count; k++) {
- int randInteger = RandomUtils.nextInt(0, 3);
+ int randInteger = RandomUtils.nextInt(0, conns.length);
String randString = RandomStringUtils.random(10, true, true);
Connection conn = conns[randInteger];
String sql = sqls[randInteger];
@@ -224,6 +264,7 @@ public void concurrencyTest() throws InterruptedException {
printInfo("thread" + fi + ", sql" + k + ": " + sql);
stmt.execute(sql);
} catch (SQLException e) {
+ flag.set(false);
System.out.println(printFail("concurrencyTest: error"));
e.printStackTrace();
}
@@ -233,6 +274,24 @@ public void concurrencyTest() throws InterruptedException {
}
latch.await(180, TimeUnit.SECONDS);
+ Assert.assertTrue(flag.get());
printOk("[OK]");
}
+
+ @Test
+ @Ignore
+ public void multiKeyspaceTest() throws SQLException, InterruptedException {
+ String sql11 = "select * from plan_test limit 10";
+ PreparedStatement ps11 = rrmConnection.prepareStatement(sql11);
+
+ for (int i = 0; i < 100000000; i++) {
+ try {
+ ResultSet resultSet = ps11.executeQuery();
+ printResult(resultSet);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ TimeUnit.SECONDS.sleep(4000);
+ }
}
diff --git a/src/test/java/testsuite/TestSuite.java b/src/test/java/testsuite/TestSuite.java
index c218c10..c41c776 100644
--- a/src/test/java/testsuite/TestSuite.java
+++ b/src/test/java/testsuite/TestSuite.java
@@ -142,6 +142,14 @@ protected String getKeyspace(TestSuiteEnv env) {
return env.getKeyspace();
}
+ public void sleepMillisSeconds(long second) {
+ try {
+ TimeUnit.MILLISECONDS.sleep(second);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
protected String getUser(TestSuiteEnv env) {
return env.getUser();
}