diff --git a/docs/properties.md b/docs/properties.md index 7d05da3..1ee479b 100644 --- a/docs/properties.md +++ b/docs/properties.md @@ -6,10 +6,9 @@ | 属性 | 数据类型 | 默认值 | 备注 | |---|---|---|---| -| cell | String | | 应用接入时,传入的cell需考虑跨机房切换,传入多个 | +| cell | String | | 应用接入时,传入的cell需考虑跨机房切换,传入多个 | | deepPaginationThreshold | int | 1000000000 | 用来设置深度分页优化的临界值,超过此参数大小会开启深度分页优化 | -| role | String | rw | 用来配置读写分离,默认role=rw,role=rr时只读 | -| role | String | | role=rr时优先读取replica节点,role=ro时读取rdonly节点 | +| role | String | rw | 用来配置读写分离,默认role=rw
role=rr 优先读replica,replica不可用时读rdonly
role=rrm 优先读replica,replica不可用时读rdonly,rdonly不可用时读取master
role=ro 读rdonly,rdonly不可用时报错。需要注意的是,rdonly节点一般用于大数据抽数和备份,OOM风险高于其他节点| | vtPlanCacheCapacity | int | 300 | 该参数用来设置执行计划缓存cache大小,最大值10240 | | queryConsolidator | boolean | false | 用来开启Consolidator,仅在role=rr场景生效;相同的sql语句只执行一次,其余线程等待第一次查询返回结果后返回 | | queryParallelNum | int | 1 | 在分表场景下,执行事务外的SQL语句时每个分片上可开启的最大并发数 | @@ -22,7 +21,7 @@ | password | String | | 连接时使用的密码。 | | characterEncoding | String | utf8 | 是指定所处理字符的解码和编码的格式,或者说是标准。若项目的字符集和MySQL数据库字符集设置为同一字符集则url可以不加此参数。 | | serverTimezone | String | | 设置时区 | -| socketTimeout | int | 10000 | 查询超时时间,最小值不得小于1000,小于1000时默认设置为1000 | +| socketTimeout | int | 10000 | 查询超时时间,最小值不得小于1000,小于1000时默认设置为1000 | | allowMultiQueries| boolean| true| 在一条语句中,允许使用“;”来分隔多条查询。不可更改,VtDriver强制设置为true| | maxAllowedPacket | byte | 65535(64k) | 设置server接受的数据包的大小 | | zeroDateTimeBehavior | String | exception | JAVA连接MySQL数据库,在操作值为0的timestamp类型时不能正确的处理,而是默认抛出一个异常。参数,exception:默认值;convertToNull:将日期转换成NULL值;round:替换成最近的日期 | @@ -34,15 +33,12 @@ | connectTimeout | long | 0 | 套接字连接的超时(单位为毫秒),0表示无超时 | | useSSL | boolean | false | 在与服务器通信时使用SSL | | useAffectedRows | boolean | false | 当连接到服务器时不要设置“client_found_rows”标签 (这个是不符合JDBC标准的,它会破坏大部分依赖“found”VS DML语句下的”affected”应用程序)。但是会导致“insert”里面的“Correct”更新数据。服务器会返回“ON Duplicate Key update”的状态 | -| rewriteBatchedStatements | boolean | false | 针对Statement.executeBatch(), 是否使用MultiQuery方式执行。在分表场景下,由于分表底层已开启MultiQuery,不能开启这个参数 | +| rewriteBatchedStatements | boolean | false | 针对Statement.executeBatch(), 是否使用MultiQuery方式执行。在分表场景下,由于分表底层已开启MultiQuery,不能开启这个参数 | ##### 3.线程池参数(内部线程池仅以第一次创建Connection的参数为准) | 属性 | 数据类型 | 默认值 | 备注 | |---|---|---|---| -| daemonCoreSize | int | 10 | Daemon线程池核心线程数 | -| daemonMaximumSize | int | 100 | Daemon线程池最大线程数 | -| daemonRejectedTimeout | long | 3000 | Daemon线程池拒绝任务丢弃超时(毫秒) | | queryCoreSize | int | 当前cpu核心线程数 | 执行SQL线程池核心线程数 | | queryMaximumSize | int | 100 | 执行SQL线程池最大线程数 | | queryQueueSize | int | 1000 | 执行SQL线程池任务队列长度 | @@ -65,12 +61,12 @@ | vtMinimumIdle | int | 5 | 最小连接数(初始连接数) | | vtValidationTimeout | long | 5000 | 此属性控制连接测试活动的最长时间。该值必须小于connectionTimeout。最低可接受的验证超时为250毫秒。 | -*注*:`vtMinimumIdle`和`vtMaximumPoolSize`两个参数同时未指定且分片数大于等于8时, 默认值有所不同: +*注*:`vtMinimumIdle`和`vtMaximumPoolSize`两个参数同时未指定且分片数大于等于8时, 默认值有所不同: -- 分片数>=8且<16时, `vtMinimumIdle` / `vtMaximumPoolSize` = 4 / 8。 -- 分片数>=16且<32时, `vtMinimumIdle` / `vtMaximumPoolSize` = 3 / 6。 -- 分片数>=32且<64时, `vtMinimumIdle` / `vtMaximumPoolSize` = 2 / 5。 -- 分片数>=64时, `vtMinimumIdle` / `vtMaximumPoolSize` = 2 / 4。 +- 分片数>=8且<16时, `vtMinimumIdle` / `vtMaximumPoolSize` = 4 / 8。 +- 分片数>=16且<32时, `vtMinimumIdle` / `vtMaximumPoolSize` = 3 / 6。 +- 分片数>=32且<64时, `vtMinimumIdle` / `vtMaximumPoolSize` = 2 / 5。 +- 分片数>=64时, `vtMinimumIdle` / `vtMaximumPoolSize` = 2 / 4。 ### VtDriver中的系统参数 通过JVM参数方式传入,比如-Dvtdriver.api.port=9999 diff --git a/pom.xml b/pom.xml index f87f4ff..014b0c1 100644 --- a/pom.xml +++ b/pom.xml @@ -59,6 +59,7 @@ 3.2.10 1.45.1 1.97.1 + 3.2.2 @@ -228,6 +229,12 @@ ${jackson.version} test + + commons-collections + commons-collections + ${commons-collections.version} + test + diff --git a/src/main/java/com/jd/jdbc/common/Constant.java b/src/main/java/com/jd/jdbc/common/Constant.java index 9cd9911..64718ff 100644 --- a/src/main/java/com/jd/jdbc/common/Constant.java +++ b/src/main/java/com/jd/jdbc/common/Constant.java @@ -35,6 +35,8 @@ public class Constant { public static final String DRIVER_PROPERTY_ROLE_RO = "ro"; + public static final String DRIVER_PROPERTY_ROLE_RRM = "rrm"; + public static final String DRIVER_PROPERTY_SCHEMA = "schema"; public static final String DRIVER_PROPERTY_QUERY_CONSOLIDATOR = "queryConsolidator"; diff --git a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java index cda776a..b645c6f 100644 --- a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java +++ b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java @@ -24,6 +24,7 @@ import com.jd.jdbc.monitor.HealthCheckCollector; import com.jd.jdbc.pool.StatefulConnectionPool; import com.jd.jdbc.queryservice.IQueryService; +import com.jd.jdbc.queryservice.RoleType; import com.jd.jdbc.sqlparser.support.logging.Log; import com.jd.jdbc.sqlparser.support.logging.LogFactory; import com.jd.jdbc.sqlparser.utils.StringUtils; @@ -167,6 +168,25 @@ public List getHealthyTabletStatsMaybeStandby(Query.Target ta return healthyTabletStats; } + public List getTabletHealthChecks(final Query.Target target, final RoleType roleType) { + Query.Target queryTarget = target; + if (target.getShard().isEmpty()) { + queryTarget = target.toBuilder().setShard("0").build(); + } + List tablets = this.getHealthyTabletStats(queryTarget); + if (CollectionUtils.isEmpty(tablets) && Objects.equals(Topodata.TabletType.REPLICA, queryTarget.getTabletType())) { + Topodata.TabletType[] tabletTypes = roleType.getTabletTypes(); + int i = 1; + while (CollectionUtils.isEmpty(tablets) && i < tabletTypes.length) { + Topodata.TabletType tabletType = tabletTypes[i]; + queryTarget = queryTarget.toBuilder().setTabletType(tabletType).build(); + tablets = this.getHealthyTabletStats(queryTarget); + i++; + } + } + return tablets; + } + public List getHealthyTabletStats(Query.Target target) { this.lock.lock(); try { diff --git a/src/main/java/com/jd/jdbc/queryservice/RetryTabletQueryService.java b/src/main/java/com/jd/jdbc/queryservice/RetryTabletQueryService.java index ed54318..93f677f 100644 --- a/src/main/java/com/jd/jdbc/queryservice/RetryTabletQueryService.java +++ b/src/main/java/com/jd/jdbc/queryservice/RetryTabletQueryService.java @@ -21,6 +21,7 @@ import com.jd.jdbc.context.IContext; import com.jd.jdbc.discovery.HealthCheck; import com.jd.jdbc.discovery.TabletHealthCheck; +import com.jd.jdbc.queryservice.util.RoleUtils; import com.jd.jdbc.sqlparser.support.logging.Log; import com.jd.jdbc.sqlparser.support.logging.LogFactory; import com.jd.jdbc.sqltypes.BatchVtResultSet; @@ -42,7 +43,6 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Set; public class RetryTabletQueryService implements IQueryService, IHealthCheckQueryService { @@ -78,7 +78,7 @@ public Query.CommitResponse commit(IContext context, Query.Target target, Long t throw e; } } - }).responses; + }, RoleUtils.getRoleType(context)).responses; } @Override @@ -97,7 +97,7 @@ public Query.RollbackResponse rollback(IContext context, Query.Target target, Lo throw e; } } - }).responses; + }, RoleUtils.getRoleType(context)).responses; } @Override @@ -107,7 +107,7 @@ public BeginVtResultSet beginExecute(IContext context, Query.Target target, List log.debug("beginExecute --> " + sql); } - boolean inDedicatedConn = reservedID != 0; + boolean inDedicatedConn = (reservedID != 0); return (BeginVtResultSet) retry(inDedicatedConn, target, (IQueryService qs) -> { try { BeginVtResultSet beginVtResultSet = qs.beginExecute(context, target, preQuries, sql, bindVariables, reservedID, options); @@ -123,7 +123,7 @@ public BeginVtResultSet beginExecute(IContext context, Query.Target target, List throw e; } } - }).resultSetMessage; + }, RoleUtils.getRoleType(context)).resultSetMessage; } @Override @@ -132,7 +132,7 @@ public VtResultSet execute(IContext context, Query.Target target, String sql, Ma if (log.isDebugEnabled()) { log.debug("execute [" + transactionID + "] --> " + sql); } - boolean inDedicatedConn = reservedID != 0 || transactionID != 0; + boolean inDedicatedConn = (reservedID != 0 || transactionID != 0); return (VtResultSet) retry(inDedicatedConn, target, (IQueryService qs) -> { try { VtResultSet ret = qs.execute(context, target, sql, bindVariables, transactionID, reservedID, options); @@ -148,7 +148,7 @@ public VtResultSet execute(IContext context, Query.Target target, String sql, Ma throw e; } } - }).resultSetMessage; + }, RoleUtils.getRoleType(context)).resultSetMessage; } @Override @@ -185,7 +185,7 @@ public BatchVtResultSet executeBatch(IContext context, Query.Target target, List throw e; } } - }).resultSetMessage; + }, RoleUtils.getRoleType(context)).resultSetMessage; } @Override @@ -202,7 +202,7 @@ public BeginBatchVtResultSet beginExecuteBatch(IContext context, Query.Target ta throw e; } } - }).resultSetMessage; + }, RoleUtils.getRoleType(context)).resultSetMessage; } @Override @@ -230,7 +230,7 @@ public Query.ReleaseResponse release(IContext context, Query.Target target, Long return (Query.ReleaseResponse) retry(inDedicatedConn, target, (IQueryService qs) -> { Query.ReleaseResponse ret = qs.release(context, target, transactionID, reservedID); return new IInner.InnerResult(false, ret); - }).responses; + }, RoleUtils.getRoleType(context)).responses; } @Override @@ -242,7 +242,7 @@ private boolean canRetry(IContext context, Exception e) { return !context.isDone() && (e instanceof SQLRecoverableException); } - private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IInner inner) throws SQLException { + private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IInner inner, RoleType roleType) throws SQLException { if (inTransaction && target.getTabletType() != Topodata.TabletType.MASTER) { throw new SQLException("gateway's query service can only be used for non-transactional queries on replica and rdonly"); @@ -251,7 +251,7 @@ private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IIn Set invalidTablets = new HashSet<>(); for (int i = 0; i < RETRY_COUNT + 1; i++) { - List typeTablets = getTabletHealthChecks(target); + List typeTablets = ((TabletGateway) gateway).getHc().getTabletHealthChecks(target, roleType); if (typeTablets == null || typeTablets.size() == 0) { throw new SQLException("target: " + HealthCheck.keyFromTarget(target) + ". no valid tablet"); @@ -272,10 +272,12 @@ private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IIn } } + // not used for replica till now, so use a simple shuffle(). TabletHealthCheck tablet = getRandomTablet(tablets); if (tablet == null) { throw new SQLException("no available connection"); } + if (tablet.getQueryService() == null) { invalidTablets.add(TopoProto.tabletAliasString(tablet.getTablet().getAlias())); continue; @@ -296,28 +298,10 @@ private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IIn throw new SQLException("target: " + HealthCheck.keyFromTarget(target) + ". all tablets are tried, no available connection"); } - private List getTabletHealthChecks(Query.Target target) { - if (target.getShard().isEmpty()) { - target = target.toBuilder().setShard("0").build(); - } - List typeTablets; - if (Objects.equals(Topodata.TabletType.REPLICA, target.getTabletType())) { - typeTablets = ((TabletGateway) gateway).getHc().getHealthyTabletStats(target); - if (typeTablets == null || typeTablets.isEmpty()) { - typeTablets = ((TabletGateway) gateway).getHc().getHealthyTabletStats(target.toBuilder().setTabletType(Topodata.TabletType.RDONLY).build()); - } - } else { - typeTablets = ((TabletGateway) gateway).getHc().getHealthyTabletStats(target); - } - return typeTablets; - } - private TabletHealthCheck getRandomTablet(List tablets) { - TabletHealthCheck tablet; if (tablets.size() > 1) { Collections.shuffle(tablets); } - tablet = tablets.get(0); - return tablet; + return tablets.get(0); } } \ No newline at end of file diff --git a/src/main/java/com/jd/jdbc/queryservice/RoleType.java b/src/main/java/com/jd/jdbc/queryservice/RoleType.java new file mode 100644 index 0000000..da2a971 --- /dev/null +++ b/src/main/java/com/jd/jdbc/queryservice/RoleType.java @@ -0,0 +1,54 @@ +/* +Copyright 2021 JD Project Authors. Licensed under Apache-2.0. + +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.jd.jdbc.queryservice; + +import io.vitess.proto.Topodata; +import java.util.Arrays; +import lombok.Getter; + +public class RoleType { + + @Getter + private final Topodata.TabletType[] tabletTypes; + + public RoleType(Topodata.TabletType... tabletTypes) { + this.tabletTypes = tabletTypes; + } + + public Topodata.TabletType getTargetTabletType() { + return tabletTypes[0]; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RoleType roleType = (RoleType) o; + return Arrays.equals(tabletTypes, roleType.tabletTypes); + } + + @Override + public int hashCode() { + return Arrays.hashCode(tabletTypes); + } +} \ No newline at end of file diff --git a/src/main/java/com/jd/jdbc/queryservice/util/RoleUtils.java b/src/main/java/com/jd/jdbc/queryservice/util/RoleUtils.java index 5f3ab51..7c3bc00 100644 --- a/src/main/java/com/jd/jdbc/queryservice/util/RoleUtils.java +++ b/src/main/java/com/jd/jdbc/queryservice/util/RoleUtils.java @@ -18,14 +18,37 @@ import com.jd.jdbc.common.Constant; import com.jd.jdbc.context.IContext; +import com.jd.jdbc.queryservice.RoleType; import io.vitess.proto.Topodata; +import java.sql.SQLException; public class RoleUtils { + public static boolean notMaster(IContext ctx) { return getTabletType(ctx) != Topodata.TabletType.MASTER; } public static Topodata.TabletType getTabletType(IContext ctx) { - return (Topodata.TabletType) ctx.getContextValue(Constant.DRIVER_PROPERTY_ROLE_KEY); + RoleType roleType = getRoleType(ctx); + return roleType.getTargetTabletType(); + } + + public static RoleType getRoleType(IContext ctx) { + return (RoleType) ctx.getContextValue(Constant.DRIVER_PROPERTY_ROLE_KEY); + } + + public static RoleType buildRoleType(String role) throws SQLException { + switch (role.toLowerCase()) { + case Constant.DRIVER_PROPERTY_ROLE_RW: + return new RoleType(Topodata.TabletType.MASTER); + case Constant.DRIVER_PROPERTY_ROLE_RR: + return new RoleType(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY); + case Constant.DRIVER_PROPERTY_ROLE_RO: + return new RoleType(Topodata.TabletType.RDONLY); + case Constant.DRIVER_PROPERTY_ROLE_RRM: + return new RoleType(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY, Topodata.TabletType.MASTER); + default: + throw new SQLException("'role=" + role + "' " + "error in jdbc url"); + } } } diff --git a/src/main/java/com/jd/jdbc/vitess/VitessConnection.java b/src/main/java/com/jd/jdbc/vitess/VitessConnection.java index 2493523..64ab864 100755 --- a/src/main/java/com/jd/jdbc/vitess/VitessConnection.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessConnection.java @@ -26,6 +26,7 @@ import com.jd.jdbc.pool.InnerConnection; import com.jd.jdbc.pool.StatefulConnectionPool; import com.jd.jdbc.queryservice.IParentQueryService; +import com.jd.jdbc.queryservice.RoleType; import com.jd.jdbc.queryservice.TabletDialer; import com.jd.jdbc.queryservice.util.RoleUtils; import com.jd.jdbc.session.SafeSession; @@ -117,7 +118,7 @@ public VitessConnection(String url, Properties prop, TopoServer topoServer, Reso this.executor = com.jd.jdbc.Executor.getInstance(Utils.getInteger(prop, "vtPlanCacheCapacity")); this.vm = vSchemaManager; this.ctx = VtContext.withCancel(VtContext.background()); - this.ctx.setContextValue(Constant.DRIVER_PROPERTY_ROLE_KEY, VitessJdbcProperyUtil.getTabletType(prop)); + this.ctx.setContextValue(Constant.DRIVER_PROPERTY_ROLE_KEY, getRoleType(prop)); this.ctx.setContextValue(ContextKey.CTX_TOPOSERVER, topoServer); this.ctx.setContextValue(ContextKey.CTX_SCATTER_CONN, resolver.getScatterConn()); this.ctx.setContextValue(ContextKey.CTX_TX_CONN, resolver.getScatterConn().getTxConn()); @@ -495,6 +496,11 @@ public List getActualTables(String logicTableName) { return actualTables; } + private RoleType getRoleType(Properties prop) throws SQLException { + String role = prop.getProperty(Constant.DRIVER_PROPERTY_ROLE_KEY, Constant.DRIVER_PROPERTY_ROLE_RW); + return RoleUtils.buildRoleType(role); + } + public enum ContextKey { CTX_TOPOSERVER, CTX_SCATTER_CONN, diff --git a/src/main/java/com/jd/jdbc/vitess/VitessJdbcProperyUtil.java b/src/main/java/com/jd/jdbc/vitess/VitessJdbcProperyUtil.java index 0d37d66..6b22654 100644 --- a/src/main/java/com/jd/jdbc/vitess/VitessJdbcProperyUtil.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessJdbcProperyUtil.java @@ -118,6 +118,7 @@ public static Topodata.TabletType getTabletType(Properties props) throws SQLExce case Constant.DRIVER_PROPERTY_ROLE_RW: return Topodata.TabletType.MASTER; case Constant.DRIVER_PROPERTY_ROLE_RR: + case Constant.DRIVER_PROPERTY_ROLE_RRM: return Topodata.TabletType.REPLICA; case Constant.DRIVER_PROPERTY_ROLE_RO: return Topodata.TabletType.RDONLY; diff --git a/src/main/java/com/jd/jdbc/vitess/VitessStatement.java b/src/main/java/com/jd/jdbc/vitess/VitessStatement.java index 9d2522f..0e5d817 100755 --- a/src/main/java/com/jd/jdbc/vitess/VitessStatement.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessStatement.java @@ -72,7 +72,6 @@ import java.sql.SQLException; import java.sql.SQLWarning; import java.util.ArrayList; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; diff --git a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java index f88d41b..212609d 100644 --- a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java +++ b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java @@ -85,7 +85,6 @@ public static void initPool() { HealthCheck.resetHealthCheck(); TabletDialerAgent.clearTabletCache(); TopologyWatcherManager.INSTANCE.close(); - VtHealthCheckExecutorService.initialize(null, null, null, null); VtQueryExecutorService.initialize(null, null, null, null); } @@ -116,7 +115,7 @@ public void resetHealthCheck() { */ @Test - public void testHealthCheck() throws InterruptedException, IOException { + public void testHealthCheck() throws InterruptedException { printComment("1. HealthCheck Test"); printComment("a. Get Health"); @@ -155,7 +154,7 @@ public void testHealthCheck() throws InterruptedException, IOException { * 3. testing if tablet can be remove from healthy when receive error message from tablet server */ @Test - public void testHealthCheckStreamError() throws IOException, InterruptedException { + public void testHealthCheckStreamError() throws InterruptedException { printComment("2. HealthCheck Test for Error Stream Message"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -196,7 +195,7 @@ public void testHealthCheckStreamError() throws IOException, InterruptedExceptio * 3. testing if changing the type of tablet to primary; */ @Test - public void testHealthCheckExternalReparent() throws IOException, InterruptedException { + public void testHealthCheckExternalReparent() throws InterruptedException { printComment("3. HealthCheck Test one tablet External Reparent"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -242,7 +241,7 @@ public void testHealthCheckExternalReparent() throws IOException, InterruptedExc } @Test - public void testHealthCheckTwoExternalReparent() throws IOException, InterruptedException { + public void testHealthCheckTwoExternalReparent() throws InterruptedException { printComment("4. HealthCheck Test two tablets External Reparent"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -296,7 +295,7 @@ public void testHealthCheckTwoExternalReparent() throws IOException, Interrupted } @Test - public void testHealthCheckVerifiesTabletAlias() throws IOException, InterruptedException { + public void testHealthCheckVerifiesTabletAlias() throws InterruptedException { printComment("5. HealthCheck Test receive a mismatch tablet info"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -325,7 +324,7 @@ public void testHealthCheckVerifiesTabletAlias() throws IOException, Interrupted } @Test - public void testHealthCheckRemoveTabletAfterReparent() throws IOException, InterruptedException { + public void testHealthCheckRemoveTabletAfterReparent() throws InterruptedException { printComment("6. HealthCheck Test remove tablet after reparent"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -378,7 +377,7 @@ public void testHealthCheckRemoveTabletAfterReparent() throws IOException, Inter } @Test - public void testHealthCheckOnNextBeforeRemove() throws IOException, InterruptedException { + public void testHealthCheckOnNextBeforeRemove() throws InterruptedException { printComment("6a. HealthCheck Test onNext before remove tablet"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -413,7 +412,7 @@ public void testHealthCheckOnNextBeforeRemove() throws IOException, InterruptedE } @Test - public void testHealthCheckOnNextAfterRemove() throws IOException, InterruptedException { + public void testHealthCheckOnNextAfterRemove() throws InterruptedException { printComment("6b. HealthCheck Test onNext after remove tablet"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -448,7 +447,7 @@ public void testHealthCheckOnNextAfterRemove() throws IOException, InterruptedEx } @Test - public void testHealthCheckTimeout() throws IOException, InterruptedException { + public void testHealthCheckTimeout() throws InterruptedException { printComment("7. HealthCheck Test when health check timeout"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -495,7 +494,7 @@ public void testHealthCheckTimeout() throws IOException, InterruptedException { * @throws InterruptedException */ @Test - public void testGetHealthyTablet() throws IOException, InterruptedException { + public void testGetHealthyTablet() throws InterruptedException { printComment("9. HealthCheck Test the functionality of getHealthyTabletStats"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -628,7 +627,7 @@ public void testGetHealthyTablet() throws IOException, InterruptedException { } @Test - public void testPrimaryInOtherCell() throws TopoException, IOException, InterruptedException { + public void testPrimaryInOtherCell() throws TopoException, InterruptedException { TopoServer topoServer = MemoryTopoFactory.newServerAndFactory("cell1", "cell2").getTopoServer(); startWatchTopo("k", topoServer, "cell1", "cell2"); @@ -655,7 +654,7 @@ public void testPrimaryInOtherCell() throws TopoException, IOException, Interrup } @Test - public void testReplicaInOtherCell() throws TopoException, IOException, InterruptedException { + public void testReplicaInOtherCell() throws TopoException, InterruptedException { TopoServer topoServer = MemoryTopoFactory.newServerAndFactory("cell1", "cell2").getTopoServer(); startWatchTopo("k", topoServer, "cell1", "cell2"); @@ -696,7 +695,7 @@ public void testReplicaInOtherCell() throws TopoException, IOException, Interrup } @Test - public void testGetStandbyTablet() throws IOException, InterruptedException { + public void testGetStandbyTablet() throws InterruptedException { printComment("12. HealthCheck Test get healthy tablet maybe standby"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -749,7 +748,7 @@ public void testGetStandbyTablet() throws IOException, InterruptedException { } @Test - public void testUnhealthyReplicaAsSecondsBehind() throws IOException, InterruptedException { + public void testUnhealthyReplicaAsSecondsBehind() throws InterruptedException { printComment("13. HealthCheck Test get healthy tablet maybe standby"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -780,7 +779,7 @@ public void testUnhealthyReplicaAsSecondsBehind() throws IOException, Interrupte } @Test - public void testMysqlPort0to3358() throws IOException, InterruptedException { + public void testMysqlPort0to3358() throws InterruptedException { printComment("14. HealthCheck Test in Tablet MySQL port changed from 0 to 3358"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -815,7 +814,7 @@ public void testMysqlPort0to3358() throws IOException, InterruptedException { } @Test - public void testMysqlPort3358to0() throws IOException, InterruptedException { + public void testMysqlPort3358to0() throws InterruptedException { printComment("15. HealthCheck Test in Tablet MySQL port changed from 3358 to 0"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -852,7 +851,7 @@ public void testMysqlPort3358to0() throws IOException, InterruptedException { } @Test - public void testDoubleMaster() throws IOException, InterruptedException { + public void testDoubleMaster() throws InterruptedException { printComment("16. double master one no serving"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -884,7 +883,7 @@ public void testDoubleMaster() throws IOException, InterruptedException { Thread.sleep(200); printComment("e. Modify old master Tablet to no serving"); - sendOnNextMessage(mockTablet, Topodata.TabletType.MASTER, false, 0, 0.5, 0); + sendOnErrorMessage(mockTablet); Thread.sleep(200); Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size()); @@ -931,7 +930,7 @@ public void testHealthyListChecksum() { } @Test - public void testHealthyChecksumSetBehindMaster() throws IOException, InterruptedException { + public void testHealthyChecksumSetBehindMaster() throws InterruptedException { HealthCheck hc = getHealthCheck(); // add tablet String keyInHealthy = "k.s.replica"; @@ -1000,7 +999,7 @@ public void testConcurrentModificationException() throws InterruptedException { } @Test - public void testHealthyConcurrentModificationException() throws InterruptedException, IOException { + public void testHealthyConcurrentModificationException() throws InterruptedException { HealthCheck hc = getHealthCheck(); String keyspace = "k"; @@ -1078,7 +1077,7 @@ private void closeQueryService(MockTablet... tablets) throws InterruptedExceptio } } - private HealthCheck getHealthCheck() { + protected HealthCheck getHealthCheck() { HealthCheck hc = HealthCheck.INSTANCE; Assert.assertEquals(0, hc.getHealthByAliasCopy().size()); Assert.assertEquals(0, hc.getHealthyCopy().size()); @@ -1096,11 +1095,16 @@ private void assertTabletHealthCheck(TabletHealthCheck actualTabletHealthCheck, Assert.assertEquals("Wrong realtime stats", expectStats, actualTabletHealthCheck.getStats()); } - private MockTablet buildMockTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type) throws IOException { + protected MockTablet buildMockTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type) { String serverName = InProcessServerBuilder.generateName(); BlockingQueue healthMessage = new ArrayBlockingQueue<>(2); MockQueryServer queryServer = new MockQueryServer(healthMessage); - Server server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start(); + Server server = null; + try { + server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start(); + } catch (IOException e) { + throw new RuntimeException(e); + } grpcCleanup.register(server); @@ -1142,16 +1146,24 @@ private Query.Target createTarget(Topodata.TabletType type) { return Query.Target.newBuilder().setKeyspace("k").setShard("s").setTabletType(type).build(); } - private void sendOnNextMessage(MockTablet mockTablet, Topodata.TabletType type, boolean isServing, int reparentedTimestamp, double cpuUsage, int secondsBehindMaster) throws InterruptedException { + protected void sendOnNextMessage(MockTablet mockTablet, Topodata.TabletType type, boolean isServing, int reparentedTimestamp, double cpuUsage, int secondsBehindMaster) { Query.Target target = createTarget(type); Query.StreamHealthResponse streamHealthResponse = createStreamHealthResponse(mockTablet.getTablet().getAlias(), target, isServing, reparentedTimestamp, cpuUsage, secondsBehindMaster); MockQueryServer.HealthCheckMessage message = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Next, streamHealthResponse); - mockTablet.getHealthMessage().put(message); + try { + mockTablet.getHealthMessage().put(message); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } - private void sendOnErrorMessage(MockTablet mockTablet) throws InterruptedException { + protected void sendOnErrorMessage(MockTablet mockTablet) { MockQueryServer.HealthCheckMessage error = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Error, null); - mockTablet.getHealthMessage().put(error); + try { + mockTablet.getHealthMessage().put(error); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } @AllArgsConstructor diff --git a/src/test/java/com/jd/jdbc/discovery/ReadWriteSplitTest.java b/src/test/java/com/jd/jdbc/discovery/ReadWriteSplitTest.java new file mode 100644 index 0000000..913f951 --- /dev/null +++ b/src/test/java/com/jd/jdbc/discovery/ReadWriteSplitTest.java @@ -0,0 +1,310 @@ +/* +Copyright 2021 JD Project Authors. Licensed under Apache-2.0. + +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.jd.jdbc.discovery; + +import com.google.common.collect.Lists; +import com.jd.jdbc.queryservice.CombinedQueryService; +import com.jd.jdbc.queryservice.IParentQueryService; +import com.jd.jdbc.queryservice.MockQueryServer; +import com.jd.jdbc.queryservice.RoleType; +import com.jd.jdbc.queryservice.TabletDialerAgent; +import com.jd.jdbc.queryservice.util.RoleUtils; +import com.jd.jdbc.util.threadpool.impl.VtHealthCheckExecutorService; +import com.jd.jdbc.util.threadpool.impl.VtQueryExecutorService; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.testing.GrpcCleanupRule; +import io.vitess.proto.Query; +import io.vitess.proto.Topodata; +import java.io.IOException; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.apache.commons.collections.CollectionUtils; +import org.apache.commons.lang3.RandomUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import testsuite.TestSuite; + +public class ReadWriteSplitTest extends TestSuite { + private static final Map PORT_MAP = new HashMap<>(); + + private static RoleType rrRoleType; + + private static RoleType rwRoleType; + + private static RoleType roRoleType; + + private static RoleType rrmRoleType; + + @Rule + public GrpcCleanupRule grpcCleanup; + + static { + PORT_MAP.put("vt", 1); + PORT_MAP.put("grpc", 2); + try { + rrRoleType = RoleUtils.buildRoleType("rr"); + rwRoleType = RoleUtils.buildRoleType("rw"); + roRoleType = RoleUtils.buildRoleType("ro"); + rrmRoleType = RoleUtils.buildRoleType("rrm"); + } catch (SQLException throwables) { + throwables.printStackTrace(); + } + } + + private final String cell1 = "cell1"; + + private final String cell2 = "cell2"; + + private final String keyspace = "vtdriver2"; + + private final String shard = "-80"; + + @BeforeClass + public static void initPool() { + VtHealthCheckExecutorService.initialize(null, null, null, null); + VtQueryExecutorService.initialize(null, null, null, null); + } + + @Before + public void init() throws IOException { + grpcCleanup = new GrpcCleanupRule(); + } + + @After + public void resetHealthCheck() { + HealthCheck.resetHealthCheck(); + } + + @Test + public void testServing() { + // init HealthCheck + HealthCheck hc = HealthCheck.INSTANCE; + MockTablet mockTablet1 = buildMockTablet(cell1, RandomUtils.nextInt(), "a", keyspace, shard, PORT_MAP, Topodata.TabletType.REPLICA); + hc.addTablet(mockTablet1.getTablet()); + MockTablet mockTablet2 = buildMockTablet(cell2, RandomUtils.nextInt(), "b", keyspace, shard, PORT_MAP, Topodata.TabletType.RDONLY); + hc.addTablet(mockTablet2.getTablet()); + MockTablet mockTablet3 = buildMockTablet(cell1, RandomUtils.nextInt(), "c", keyspace, shard, PORT_MAP, Topodata.TabletType.MASTER); + hc.addTablet(mockTablet3.getTablet()); + MockTablet mockTablet4 = buildMockTablet(cell2, RandomUtils.nextInt(), "d", keyspace, shard, PORT_MAP, Topodata.TabletType.REPLICA); + hc.addTablet(mockTablet4.getTablet()); + + sleepMillisSeconds(200); + sendOnNextMessage(mockTablet1, mockTablet2, mockTablet3, mockTablet4); + + // check HealthCheck.getTabletHealthChecks + List tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.REPLICA), rrRoleType); + Assert.assertNotNull(tabletHealthChecks); + Assert.assertEquals(2, tabletHealthChecks.size()); + List tablets = new ArrayList<>(); + for (TabletHealthCheck tabletHealthCheck : tabletHealthChecks) { + tablets.add(tabletHealthCheck.getTablet()); + } + Assert.assertTrue(CollectionUtils.isEqualCollection(Lists.newArrayList(mockTablet1.getTablet(), mockTablet4.getTablet()), tablets)); + + tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.REPLICA), rrmRoleType); + Assert.assertNotNull(tabletHealthChecks); + Assert.assertEquals(2, tabletHealthChecks.size()); + tablets = new ArrayList<>(); + for (TabletHealthCheck tabletHealthCheck : tabletHealthChecks) { + tablets.add(tabletHealthCheck.getTablet()); + } + Assert.assertTrue(CollectionUtils.isEqualCollection(Lists.newArrayList(mockTablet1.getTablet(), mockTablet4.getTablet()), tablets)); + + tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.RDONLY), roRoleType); + Assert.assertNotNull(tabletHealthChecks); + Assert.assertEquals(1, tabletHealthChecks.size()); + Assert.assertEquals(mockTablet2.getTablet(), tabletHealthChecks.get(0).getTablet()); + + tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.MASTER), rwRoleType); + Assert.assertNotNull(tabletHealthChecks); + Assert.assertEquals(1, tabletHealthChecks.size()); + Assert.assertEquals(mockTablet3.getTablet(), tabletHealthChecks.get(0).getTablet()); + + // close resource + closeQueryService(mockTablet1, mockTablet2, mockTablet3, mockTablet4); + } + + @Test + public void testNoServing() { + // init HealthCheck + HealthCheck hc = HealthCheck.INSTANCE; + MockTablet mockTablet1 = buildMockTablet(cell2, RandomUtils.nextInt(), "e", keyspace, shard, PORT_MAP, Topodata.TabletType.REPLICA); + hc.addTablet(mockTablet1.getTablet()); + MockTablet mockTablet2 = buildMockTablet(cell2, RandomUtils.nextInt(), "f", keyspace, shard, PORT_MAP, Topodata.TabletType.RDONLY); + hc.addTablet(mockTablet2.getTablet()); + MockTablet mockTablet3 = buildMockTablet(cell1, RandomUtils.nextInt(), "g", keyspace, shard, PORT_MAP, Topodata.TabletType.MASTER); + hc.addTablet(mockTablet3.getTablet()); + MockTablet mockTablet4 = buildMockTablet(cell2, RandomUtils.nextInt(), "h", keyspace, shard, PORT_MAP, Topodata.TabletType.REPLICA); + hc.addTablet(mockTablet4.getTablet()); + + sleepMillisSeconds(200); + sendOnNextMessage(mockTablet3); + + // check HealthCheck.getTabletHealthChecks + List tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.REPLICA), rrRoleType); + Assert.assertTrue(CollectionUtils.isEmpty(tabletHealthChecks)); + + tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.RDONLY), roRoleType); + Assert.assertTrue(CollectionUtils.isEmpty(tabletHealthChecks)); + + tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.MASTER), rwRoleType); + Assert.assertNotNull(tabletHealthChecks); + Assert.assertEquals(1, tabletHealthChecks.size()); + Assert.assertEquals(mockTablet3.getTablet(), tabletHealthChecks.get(0).getTablet()); + + tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.REPLICA), rrmRoleType); + Assert.assertNotNull(tabletHealthChecks); + Assert.assertEquals(1, tabletHealthChecks.size()); + Assert.assertEquals(mockTablet3.getTablet(), tabletHealthChecks.get(0).getTablet()); + + // close resource + closeQueryService(mockTablet1, mockTablet2, mockTablet3, mockTablet4); + } + + @Test + public void testNoServing2() { + // init HealthCheck + HealthCheck hc = HealthCheck.INSTANCE; + MockTablet mockTablet1 = buildMockTablet(cell2, RandomUtils.nextInt(), "i", keyspace, shard, PORT_MAP, Topodata.TabletType.REPLICA); + hc.addTablet(mockTablet1.getTablet()); + MockTablet mockTablet2 = buildMockTablet(cell1, RandomUtils.nextInt(), "j", keyspace, shard, PORT_MAP, Topodata.TabletType.RDONLY); + hc.addTablet(mockTablet2.getTablet()); + MockTablet mockTablet3 = buildMockTablet(cell2, RandomUtils.nextInt(), "k", keyspace, shard, PORT_MAP, Topodata.TabletType.MASTER); + hc.addTablet(mockTablet3.getTablet()); + MockTablet mockTablet4 = buildMockTablet(cell2, RandomUtils.nextInt(), "l", keyspace, shard, PORT_MAP, Topodata.TabletType.REPLICA); + hc.addTablet(mockTablet4.getTablet()); + + sleepMillisSeconds(200); + sendOnNextMessage(mockTablet3, mockTablet2); + + // check HealthCheck.getTabletHealthChecks + List tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.REPLICA), rrRoleType); + Assert.assertNotNull(tabletHealthChecks); + Assert.assertEquals(1, tabletHealthChecks.size()); + Assert.assertEquals(mockTablet2.getTablet(), tabletHealthChecks.get(0).getTablet()); + + tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.RDONLY), roRoleType); + Assert.assertNotNull(tabletHealthChecks); + Assert.assertEquals(1, tabletHealthChecks.size()); + Assert.assertEquals(mockTablet2.getTablet(), tabletHealthChecks.get(0).getTablet()); + + tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.REPLICA), rrmRoleType); + Assert.assertNotNull(tabletHealthChecks); + Assert.assertEquals(1, tabletHealthChecks.size()); + Assert.assertEquals(mockTablet2.getTablet(), tabletHealthChecks.get(0).getTablet()); + + tabletHealthChecks = hc.getTabletHealthChecks(createTarget(Topodata.TabletType.MASTER), rwRoleType); + Assert.assertNotNull(tabletHealthChecks); + Assert.assertEquals(1, tabletHealthChecks.size()); + Assert.assertEquals(mockTablet3.getTablet(), tabletHealthChecks.get(0).getTablet()); + + // close resource + closeQueryService(mockTablet1, mockTablet2, mockTablet3, mockTablet4); + } + + private void sendOnNextMessage(MockTablet... mockTablets) { + for (MockTablet mockTablet : mockTablets) { + Query.Target target = createTarget(mockTablet.getTablet().getType()); + Query.StreamHealthResponse streamHealthResponse = Query.StreamHealthResponse.newBuilder() + .setTabletAlias(mockTablet.getTablet().getAlias()) + .setTarget(target) + .setServing(true) + .setTabletExternallyReparentedTimestamp(0) + .setRealtimeStats(Query.RealtimeStats.newBuilder().setCpuUsage(0.5).setSecondsBehindMaster(0).build()) + .build(); + MockQueryServer.HealthCheckMessage message = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Next, streamHealthResponse); + try { + mockTablet.getHealthMessage().put(message); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + sleepMillisSeconds(200); + } + + protected Query.Target createTarget(Topodata.TabletType type) { + return Query.Target.newBuilder().setKeyspace(keyspace).setShard(shard).setTabletType(type).build(); + } + + protected MockTablet buildMockTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type) { + BlockingQueue healthMessage = new ArrayBlockingQueue<>(10); + MockQueryServer queryServer = new MockQueryServer(healthMessage); + Server server; + try { + server = InProcessServerBuilder.forName(hostName).directExecutor().addService(queryServer).build().start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + grpcCleanup.register(server); + ManagedChannel channel = InProcessChannelBuilder.forName(hostName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS) + .keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build(); + grpcCleanup.register(channel); + Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type); + IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet); + TabletDialerAgent.registerTabletCache(tablet, combinedQueryService); + return new MockTablet(tablet, healthMessage); + } + + private Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type) { + Topodata.TabletAlias tabletAlias = Topodata.TabletAlias.newBuilder().setCell(cell).setUid(uid).build(); + int defaultMysqlPort = 3358; + Topodata.Tablet.Builder tabletBuilder = Topodata.Tablet.newBuilder().setHostname(hostName).setAlias(tabletAlias).setKeyspace(keyspaceName) + .setShard(shard).setType(type).setMysqlHostname(hostName).setMysqlPort(defaultMysqlPort); + for (Map.Entry portEntry : portMap.entrySet()) { + tabletBuilder.putPortMap(portEntry.getKey(), portEntry.getValue()); + } + System.out.printf("buildTablet: keyspace:%s, shard:%s, tablet_type:%s, cell:%s, uid:%s\n", keyspaceName, shard, type, cell, uid); + return tabletBuilder.build(); + } + + private void closeQueryService(MockTablet... tablets) { + MockQueryServer.HealthCheckMessage close = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Close, null); + for (MockTablet tablet : tablets) { + try { + tablet.getHealthMessage().put(close); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + @AllArgsConstructor + @Getter + private static class MockTablet { + + private final Topodata.Tablet tablet; + + private final BlockingQueue healthMessage; + } +} diff --git a/src/test/java/com/jd/jdbc/queryservice/MockQueryServer.java b/src/test/java/com/jd/jdbc/queryservice/MockQueryServer.java index 4780323..5f0da01 100644 --- a/src/test/java/com/jd/jdbc/queryservice/MockQueryServer.java +++ b/src/test/java/com/jd/jdbc/queryservice/MockQueryServer.java @@ -82,9 +82,7 @@ private void subscribe() { private void notifyAll(HealthCheckMessage message) { switch (message.getMessageType()) { case Close: - observers.forEach(observer -> { - observer.onCompleted(); - }); + observers.forEach(StreamObserver::onCompleted); System.out.println("server: receive an ending message, complete stream connection"); observers.clear(); break; @@ -103,7 +101,8 @@ private void notifyAll(HealthCheckMessage message) { e.printStackTrace(); } }); - System.out.println("server: receive message: " + message.getMessage()); + System.out.printf("server: receive message: keyspace:%s, shard:%s, tablet_type:%s, cell:%s, uid:%s\n", message.getMessage().getTarget().getKeyspace(), message.getMessage().getTarget().getShard(), + message.getMessage().getTarget().getTabletType(), message.getMessage().getTabletAlias().getCell(), message.getMessage().getTabletAlias().getUid()); } } diff --git a/src/test/java/com/jd/jdbc/queryservice/util/RoleUtilsTest.java b/src/test/java/com/jd/jdbc/queryservice/util/RoleUtilsTest.java new file mode 100644 index 0000000..59dd795 --- /dev/null +++ b/src/test/java/com/jd/jdbc/queryservice/util/RoleUtilsTest.java @@ -0,0 +1,63 @@ +/* +Copyright 2021 JD Project Authors. Licensed under Apache-2.0. + +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.jd.jdbc.queryservice.util; + +import com.jd.jdbc.queryservice.RoleType; +import io.vitess.proto.Topodata; +import java.sql.SQLException; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +public class RoleUtilsTest { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private RoleType rw = new RoleType(Topodata.TabletType.MASTER); + + private RoleType rr = new RoleType(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY); + + private RoleType ro = new RoleType(Topodata.TabletType.RDONLY); + + private RoleType rrm = new RoleType(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY, Topodata.TabletType.MASTER); + + @Test + public void buildRoleTypeTest() throws SQLException { + RoleType rrRoleType = RoleUtils.buildRoleType("rr"); + Assert.assertEquals(rr, rrRoleType); + RoleType rwRoleType = RoleUtils.buildRoleType("rw"); + Assert.assertEquals(rw, rwRoleType); + RoleType roRoleType = RoleUtils.buildRoleType("ro"); + Assert.assertEquals(ro, roRoleType); + RoleType rrmRoleType = RoleUtils.buildRoleType("rrm"); + Assert.assertEquals(rrm, rrmRoleType); + } + + @Test + public void buildRoleTypeErrorTest() throws SQLException { + String role = RandomStringUtils.random(3); + thrown.expect(SQLException.class); + thrown.expectMessage("'role=" + role + "' " + "error in jdbc url"); + RoleType roleType = RoleUtils.buildRoleType(role); + Assert.fail(); + } +} \ No newline at end of file diff --git a/src/test/java/com/jd/jdbc/vitess/VitessDriverReadWriteSplit.java b/src/test/java/com/jd/jdbc/vitess/VitessDriverReadWriteSplit.java index 49ba3e3..9724f07 100644 --- a/src/test/java/com/jd/jdbc/vitess/VitessDriverReadWriteSplit.java +++ b/src/test/java/com/jd/jdbc/vitess/VitessDriverReadWriteSplit.java @@ -18,6 +18,7 @@ import java.sql.Connection; import java.sql.DriverManager; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -26,12 +27,14 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.FixMethodOrder; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -52,7 +55,7 @@ public class VitessDriverReadWriteSplit extends TestSuite { protected Connection roConnection; - protected Connection rtConnection; + protected Connection rrmConnection; @Before public void init() throws Exception { @@ -61,16 +64,12 @@ public void init() throws Exception { rrConnection = DriverManager.getConnection(baseUrl + "&role=rr"); rwConnection = DriverManager.getConnection(baseUrl + "&role=rw"); roConnection = DriverManager.getConnection(baseUrl + "&role=ro"); + rrmConnection = DriverManager.getConnection(baseUrl + "&role=rrm"); } @After public void close() throws SQLException { - if (rrConnection != null) { - rrConnection.close(); - } - if (rwConnection != null) { - rwConnection.close(); - } + closeConnection(rrConnection, rwConnection, roConnection, rrmConnection); } @Test @@ -188,17 +187,57 @@ public void test12() throws SQLException { public void test13() throws SQLException { thrown.expect(SQLException.class); thrown.expectMessage("error in jdbc url"); - rtConnection = DriverManager.getConnection(baseUrl + "&role=rt"); + DriverManager.getConnection(baseUrl + "&role=rt"); + } + + @Test + public void test14() throws SQLException { + thrown.expect(SQLException.class); + thrown.expectMessage("is not allowed for read only connection"); + try (Statement stmt = rrmConnection.createStatement()) { + stmt.executeUpdate("delete from test"); + } + } + + @Test + public void test15() throws SQLException { + thrown.expect(SQLException.class); + thrown.expectMessage("is not allowed for read only connection"); + try (Statement stmt = rrmConnection.createStatement()) { + stmt.executeUpdate("insert into test (f_tinyint,f_int) values(1,2)"); + } + } + + @Test + public void test16() throws SQLException { + thrown.expect(SQLException.class); + thrown.expectMessage("is not allowed for read only connection"); + try (Statement stmt = rrmConnection.createStatement()) { + stmt.executeUpdate("update test set f_int = 100 where f_tinyint = 1"); + } + } + + @Test + public void test17() throws SQLException { + sleep(10); + try (Statement stmt = rrmConnection.createStatement()) { + ResultSet resultSet; + resultSet = stmt.executeQuery("select f_tinyint,f_int from test where f_tinyint = 1"); + while (resultSet.next()) { + Assert.assertEquals(100, resultSet.getInt(2)); + } + } } @Test public void concurrencyTest() throws InterruptedException { // 交替使用不同账户执行sql - final Connection[] conns = new Connection[] {rwConnection, rrConnection, roConnection}; + final Connection[] conns = new Connection[] {rwConnection, rrConnection, roConnection, rrmConnection}; final String[] sqls = new String[] { "insert into user (id, name) values (null, '%s')", "select id, name from user limit 1", "select id, name from user limit 2", + "select id, name from user limit 3", }; final int numThreads = 4; final int count = 200; @@ -207,12 +246,13 @@ public void concurrencyTest() throws InterruptedException { 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); + AtomicBoolean flag = new AtomicBoolean(true); CountDownLatch latch = new CountDownLatch(numThreads); for (int i = 0; i < numThreads; i++) { final int fi = i; service.execute(() -> { for (int k = 0; k < count; k++) { - int randInteger = RandomUtils.nextInt(0, 3); + int randInteger = RandomUtils.nextInt(0, conns.length); String randString = RandomStringUtils.random(10, true, true); Connection conn = conns[randInteger]; String sql = sqls[randInteger]; @@ -224,6 +264,7 @@ public void concurrencyTest() throws InterruptedException { printInfo("thread" + fi + ", sql" + k + ": " + sql); stmt.execute(sql); } catch (SQLException e) { + flag.set(false); System.out.println(printFail("concurrencyTest: error")); e.printStackTrace(); } @@ -233,6 +274,24 @@ public void concurrencyTest() throws InterruptedException { } latch.await(180, TimeUnit.SECONDS); + Assert.assertTrue(flag.get()); printOk("[OK]"); } + + @Test + @Ignore + public void multiKeyspaceTest() throws SQLException, InterruptedException { + String sql11 = "select * from plan_test limit 10"; + PreparedStatement ps11 = rrmConnection.prepareStatement(sql11); + + for (int i = 0; i < 100000000; i++) { + try { + ResultSet resultSet = ps11.executeQuery(); + printResult(resultSet); + } catch (Exception e) { + e.printStackTrace(); + } + } + TimeUnit.SECONDS.sleep(4000); + } } diff --git a/src/test/java/testsuite/TestSuite.java b/src/test/java/testsuite/TestSuite.java index c218c10..c41c776 100644 --- a/src/test/java/testsuite/TestSuite.java +++ b/src/test/java/testsuite/TestSuite.java @@ -142,6 +142,14 @@ protected String getKeyspace(TestSuiteEnv env) { return env.getKeyspace(); } + public void sleepMillisSeconds(long second) { + try { + TimeUnit.MILLISECONDS.sleep(second); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + protected String getUser(TestSuiteEnv env) { return env.getUser(); }