Skip to content

Commit

Permalink
read master where slava not surviving (#128)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jin-2019 authored Aug 11, 2023
1 parent 2dd4c3c commit 94acdaf
Show file tree
Hide file tree
Showing 16 changed files with 632 additions and 89 deletions.
22 changes: 9 additions & 13 deletions docs/properties.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@

| 属性 | 数据类型 | 默认值 | 备注 |
|---|---|---|---|
| cell | String | | 应用接入时,传入的cell需考虑跨机房切换,传入多个 |
| cell | String | | 应用接入时传入的cell需考虑跨机房切换传入多个 |
| deepPaginationThreshold | int | 1000000000 | 用来设置深度分页优化的临界值,超过此参数大小会开启深度分页优化 |
| role | String | rw | 用来配置读写分离,默认role=rw,role=rr时只读 |
| role | String | | role=rr时优先读取replica节点,role=ro时读取rdonly节点 |
| role | String | rw | 用来配置读写分离,默认role=rw <br>role=rr 优先读replica,replica不可用时读rdonly <br>role=rrm 优先读replica,replica不可用时读rdonly,rdonly不可用时读取master <br>role=ro 读rdonly,rdonly不可用时报错。需要注意的是,rdonly节点一般用于大数据抽数和备份,OOM风险高于其他节点|
| vtPlanCacheCapacity | int | 300 | 该参数用来设置执行计划缓存cache大小,最大值10240 |
| queryConsolidator | boolean | false | 用来开启Consolidator,仅在role=rr场景生效;相同的sql语句只执行一次,其余线程等待第一次查询返回结果后返回 |
| queryParallelNum | int | 1 | 在分表场景下,执行事务外的SQL语句时每个分片上可开启的最大并发数 |
Expand All @@ -22,7 +21,7 @@
| password | String | | 连接时使用的密码。 |
| characterEncoding | String | utf8 | 是指定所处理字符的解码和编码的格式,或者说是标准。若项目的字符集和MySQL数据库字符集设置为同一字符集则url可以不加此参数。 |
| serverTimezone | String | | 设置时区 |
| socketTimeout | int | 10000 | 查询超时时间,最小值不得小于1000,小于1000时默认设置为1000 |
| socketTimeout | int | 10000 | 查询超时时间最小值不得小于1000,小于1000时默认设置为1000 |
| allowMultiQueries| boolean| true| 在一条语句中,允许使用“;”来分隔多条查询。不可更改,VtDriver强制设置为true|
| maxAllowedPacket | byte | 65535(64k) | 设置server接受的数据包的大小 |
| zeroDateTimeBehavior | String | exception | JAVA连接MySQL数据库,在操作值为0的timestamp类型时不能正确的处理,而是默认抛出一个异常。参数,exception:默认值;convertToNull:将日期转换成NULL值;round:替换成最近的日期 |
Expand All @@ -34,15 +33,12 @@
| connectTimeout | long | 0 | 套接字连接的超时(单位为毫秒),0表示无超时 |
| useSSL | boolean | false | 在与服务器通信时使用SSL |
| useAffectedRows | boolean | false | 当连接到服务器时不要设置“client_found_rows”标签 (这个是不符合JDBC标准的,它会破坏大部分依赖“found”VS DML语句下的”affected”应用程序)。但是会导致“insert”里面的“Correct”更新数据。服务器会返回“ON Duplicate Key update”的状态 |
| rewriteBatchedStatements | boolean | false | 针对Statement.executeBatch(), 是否使用MultiQuery方式执行。在分表场景下,由于分表底层已开启MultiQuery,不能开启这个参数 |
| rewriteBatchedStatements | boolean | false | 针对Statement.executeBatch() 是否使用MultiQuery方式执行。在分表场景下,由于分表底层已开启MultiQuery,不能开启这个参数 |

##### 3.线程池参数(内部线程池仅以第一次创建Connection的参数为准)

| 属性 | 数据类型 | 默认值 | 备注 |
|---|---|---|---|
| daemonCoreSize | int | 10 | Daemon线程池核心线程数 |
| daemonMaximumSize | int | 100 | Daemon线程池最大线程数 |
| daemonRejectedTimeout | long | 3000 | Daemon线程池拒绝任务丢弃超时(毫秒) |
| queryCoreSize | int | 当前cpu核心线程数 | 执行SQL线程池核心线程数 |
| queryMaximumSize | int | 100 | 执行SQL线程池最大线程数 |
| queryQueueSize | int | 1000 | 执行SQL线程池任务队列长度 |
Expand All @@ -65,12 +61,12 @@
| vtMinimumIdle | int | 5 | 最小连接数(初始连接数) |
| vtValidationTimeout | long | 5000 | 此属性控制连接测试活动的最长时间。该值必须小于connectionTimeout。最低可接受的验证超时为250毫秒。 |

**`vtMinimumIdle``vtMaximumPoolSize`两个参数同时未指定且分片数大于等于8时, 默认值有所不同:
**`vtMinimumIdle``vtMaximumPoolSize`两个参数同时未指定且分片数大于等于8时 默认值有所不同:

- 分片数>=8且<16时, `vtMinimumIdle` / `vtMaximumPoolSize` = 4 / 8。
- 分片数>=16且<32时, `vtMinimumIdle` / `vtMaximumPoolSize` = 3 / 6。
- 分片数>=32且<64时, `vtMinimumIdle` / `vtMaximumPoolSize` = 2 / 5。
- 分片数>=64时, `vtMinimumIdle` / `vtMaximumPoolSize` = 2 / 4。
- 分片数>=8且<16时 `vtMinimumIdle` / `vtMaximumPoolSize` = 4 / 8。
- 分片数>=16且<32时 `vtMinimumIdle` / `vtMaximumPoolSize` = 3 / 6。
- 分片数>=32且<64时 `vtMinimumIdle` / `vtMaximumPoolSize` = 2 / 5。
- 分片数>=64时 `vtMinimumIdle` / `vtMaximumPoolSize` = 2 / 4。

### VtDriver中的系统参数
通过JVM参数方式传入,比如-Dvtdriver.api.port=9999
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
<cglib.version>3.2.10</cglib.version>
<grpc.testing.version>1.45.1</grpc.testing.version>
<grpc.mockserver.version>1.97.1</grpc.mockserver.version>
<commons-collections.version>3.2.2</commons-collections.version>
</properties>

<dependencies>
Expand Down Expand Up @@ -228,6 +229,12 @@
<version>${jackson.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>${commons-collections.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/jd/jdbc/common/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class Constant {

public static final String DRIVER_PROPERTY_ROLE_RO = "ro";

public static final String DRIVER_PROPERTY_ROLE_RRM = "rrm";

public static final String DRIVER_PROPERTY_SCHEMA = "schema";

public static final String DRIVER_PROPERTY_QUERY_CONSOLIDATOR = "queryConsolidator";
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/jd/jdbc/discovery/HealthCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.jd.jdbc.monitor.HealthCheckCollector;
import com.jd.jdbc.pool.StatefulConnectionPool;
import com.jd.jdbc.queryservice.IQueryService;
import com.jd.jdbc.queryservice.RoleType;
import com.jd.jdbc.sqlparser.support.logging.Log;
import com.jd.jdbc.sqlparser.support.logging.LogFactory;
import com.jd.jdbc.sqlparser.utils.StringUtils;
Expand Down Expand Up @@ -167,6 +168,25 @@ public List<TabletHealthCheck> getHealthyTabletStatsMaybeStandby(Query.Target ta
return healthyTabletStats;
}

public List<TabletHealthCheck> getTabletHealthChecks(final Query.Target target, final RoleType roleType) {
Query.Target queryTarget = target;
if (target.getShard().isEmpty()) {
queryTarget = target.toBuilder().setShard("0").build();
}
List<TabletHealthCheck> tablets = this.getHealthyTabletStats(queryTarget);
if (CollectionUtils.isEmpty(tablets) && Objects.equals(Topodata.TabletType.REPLICA, queryTarget.getTabletType())) {
Topodata.TabletType[] tabletTypes = roleType.getTabletTypes();
int i = 1;
while (CollectionUtils.isEmpty(tablets) && i < tabletTypes.length) {
Topodata.TabletType tabletType = tabletTypes[i];
queryTarget = queryTarget.toBuilder().setTabletType(tabletType).build();
tablets = this.getHealthyTabletStats(queryTarget);
i++;
}
}
return tablets;
}

public List<TabletHealthCheck> getHealthyTabletStats(Query.Target target) {
this.lock.lock();
try {
Expand Down
46 changes: 15 additions & 31 deletions src/main/java/com/jd/jdbc/queryservice/RetryTabletQueryService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.jd.jdbc.context.IContext;
import com.jd.jdbc.discovery.HealthCheck;
import com.jd.jdbc.discovery.TabletHealthCheck;
import com.jd.jdbc.queryservice.util.RoleUtils;
import com.jd.jdbc.sqlparser.support.logging.Log;
import com.jd.jdbc.sqlparser.support.logging.LogFactory;
import com.jd.jdbc.sqltypes.BatchVtResultSet;
Expand All @@ -42,7 +43,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

public class RetryTabletQueryService implements IQueryService, IHealthCheckQueryService {
Expand Down Expand Up @@ -78,7 +78,7 @@ public Query.CommitResponse commit(IContext context, Query.Target target, Long t
throw e;
}
}
}).responses;
}, RoleUtils.getRoleType(context)).responses;
}

@Override
Expand All @@ -97,7 +97,7 @@ public Query.RollbackResponse rollback(IContext context, Query.Target target, Lo
throw e;
}
}
}).responses;
}, RoleUtils.getRoleType(context)).responses;
}

@Override
Expand All @@ -107,7 +107,7 @@ public BeginVtResultSet beginExecute(IContext context, Query.Target target, List
log.debug("beginExecute --> " + sql);
}

boolean inDedicatedConn = reservedID != 0;
boolean inDedicatedConn = (reservedID != 0);
return (BeginVtResultSet) retry(inDedicatedConn, target, (IQueryService qs) -> {
try {
BeginVtResultSet beginVtResultSet = qs.beginExecute(context, target, preQuries, sql, bindVariables, reservedID, options);
Expand All @@ -123,7 +123,7 @@ public BeginVtResultSet beginExecute(IContext context, Query.Target target, List
throw e;
}
}
}).resultSetMessage;
}, RoleUtils.getRoleType(context)).resultSetMessage;
}

@Override
Expand All @@ -132,7 +132,7 @@ public VtResultSet execute(IContext context, Query.Target target, String sql, Ma
if (log.isDebugEnabled()) {
log.debug("execute [" + transactionID + "] --> " + sql);
}
boolean inDedicatedConn = reservedID != 0 || transactionID != 0;
boolean inDedicatedConn = (reservedID != 0 || transactionID != 0);
return (VtResultSet) retry(inDedicatedConn, target, (IQueryService qs) -> {
try {
VtResultSet ret = qs.execute(context, target, sql, bindVariables, transactionID, reservedID, options);
Expand All @@ -148,7 +148,7 @@ public VtResultSet execute(IContext context, Query.Target target, String sql, Ma
throw e;
}
}
}).resultSetMessage;
}, RoleUtils.getRoleType(context)).resultSetMessage;
}

@Override
Expand Down Expand Up @@ -185,7 +185,7 @@ public BatchVtResultSet executeBatch(IContext context, Query.Target target, List
throw e;
}
}
}).resultSetMessage;
}, RoleUtils.getRoleType(context)).resultSetMessage;
}

@Override
Expand All @@ -202,7 +202,7 @@ public BeginBatchVtResultSet beginExecuteBatch(IContext context, Query.Target ta
throw e;
}
}
}).resultSetMessage;
}, RoleUtils.getRoleType(context)).resultSetMessage;
}

@Override
Expand Down Expand Up @@ -230,7 +230,7 @@ public Query.ReleaseResponse release(IContext context, Query.Target target, Long
return (Query.ReleaseResponse) retry(inDedicatedConn, target, (IQueryService qs) -> {
Query.ReleaseResponse ret = qs.release(context, target, transactionID, reservedID);
return new IInner.InnerResult(false, ret);
}).responses;
}, RoleUtils.getRoleType(context)).responses;
}

@Override
Expand All @@ -242,7 +242,7 @@ private boolean canRetry(IContext context, Exception e) {
return !context.isDone() && (e instanceof SQLRecoverableException);
}

private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IInner inner) throws SQLException {
private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IInner inner, RoleType roleType) throws SQLException {

if (inTransaction && target.getTabletType() != Topodata.TabletType.MASTER) {
throw new SQLException("gateway's query service can only be used for non-transactional queries on replica and rdonly");
Expand All @@ -251,7 +251,7 @@ private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IIn
Set<String> invalidTablets = new HashSet<>();

for (int i = 0; i < RETRY_COUNT + 1; i++) {
List<TabletHealthCheck> typeTablets = getTabletHealthChecks(target);
List<TabletHealthCheck> typeTablets = ((TabletGateway) gateway).getHc().getTabletHealthChecks(target, roleType);

if (typeTablets == null || typeTablets.size() == 0) {
throw new SQLException("target: " + HealthCheck.keyFromTarget(target) + ". no valid tablet");
Expand All @@ -272,10 +272,12 @@ private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IIn
}
}

// not used for replica till now, so use a simple shuffle().
TabletHealthCheck tablet = getRandomTablet(tablets);
if (tablet == null) {
throw new SQLException("no available connection");
}

if (tablet.getQueryService() == null) {
invalidTablets.add(TopoProto.tabletAliasString(tablet.getTablet().getAlias()));
continue;
Expand All @@ -296,28 +298,10 @@ private IInner.InnerResult retry(boolean inTransaction, Query.Target target, IIn
throw new SQLException("target: " + HealthCheck.keyFromTarget(target) + ". all tablets are tried, no available connection");
}

private List<TabletHealthCheck> getTabletHealthChecks(Query.Target target) {
if (target.getShard().isEmpty()) {
target = target.toBuilder().setShard("0").build();
}
List<TabletHealthCheck> typeTablets;
if (Objects.equals(Topodata.TabletType.REPLICA, target.getTabletType())) {
typeTablets = ((TabletGateway) gateway).getHc().getHealthyTabletStats(target);
if (typeTablets == null || typeTablets.isEmpty()) {
typeTablets = ((TabletGateway) gateway).getHc().getHealthyTabletStats(target.toBuilder().setTabletType(Topodata.TabletType.RDONLY).build());
}
} else {
typeTablets = ((TabletGateway) gateway).getHc().getHealthyTabletStats(target);
}
return typeTablets;
}

private TabletHealthCheck getRandomTablet(List<TabletHealthCheck> tablets) {
TabletHealthCheck tablet;
if (tablets.size() > 1) {
Collections.shuffle(tablets);
}
tablet = tablets.get(0);
return tablet;
return tablets.get(0);
}
}
54 changes: 54 additions & 0 deletions src/main/java/com/jd/jdbc/queryservice/RoleType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
Copyright 2021 JD Project Authors. Licensed under Apache-2.0.
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.jd.jdbc.queryservice;

import io.vitess.proto.Topodata;
import java.util.Arrays;
import lombok.Getter;

public class RoleType {

@Getter
private final Topodata.TabletType[] tabletTypes;

public RoleType(Topodata.TabletType... tabletTypes) {
this.tabletTypes = tabletTypes;
}

public Topodata.TabletType getTargetTabletType() {
return tabletTypes[0];
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RoleType roleType = (RoleType) o;
return Arrays.equals(tabletTypes, roleType.tabletTypes);
}

@Override
public int hashCode() {
return Arrays.hashCode(tabletTypes);
}
}
25 changes: 24 additions & 1 deletion src/main/java/com/jd/jdbc/queryservice/util/RoleUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,37 @@

import com.jd.jdbc.common.Constant;
import com.jd.jdbc.context.IContext;
import com.jd.jdbc.queryservice.RoleType;
import io.vitess.proto.Topodata;
import java.sql.SQLException;

public class RoleUtils {

public static boolean notMaster(IContext ctx) {
return getTabletType(ctx) != Topodata.TabletType.MASTER;
}

public static Topodata.TabletType getTabletType(IContext ctx) {
return (Topodata.TabletType) ctx.getContextValue(Constant.DRIVER_PROPERTY_ROLE_KEY);
RoleType roleType = getRoleType(ctx);
return roleType.getTargetTabletType();
}

public static RoleType getRoleType(IContext ctx) {
return (RoleType) ctx.getContextValue(Constant.DRIVER_PROPERTY_ROLE_KEY);
}

public static RoleType buildRoleType(String role) throws SQLException {
switch (role.toLowerCase()) {
case Constant.DRIVER_PROPERTY_ROLE_RW:
return new RoleType(Topodata.TabletType.MASTER);
case Constant.DRIVER_PROPERTY_ROLE_RR:
return new RoleType(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY);
case Constant.DRIVER_PROPERTY_ROLE_RO:
return new RoleType(Topodata.TabletType.RDONLY);
case Constant.DRIVER_PROPERTY_ROLE_RRM:
return new RoleType(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY, Topodata.TabletType.MASTER);
default:
throw new SQLException("'role=" + role + "' " + "error in jdbc url");
}
}
}
Loading

0 comments on commit 94acdaf

Please sign in to comment.