Skip to content

Commit

Permalink
Simplified error information when the metadata directory does not exi…
Browse files Browse the repository at this point in the history
…st, suitable for the cell offline scenario
  • Loading branch information
wlx5575 committed Jul 21, 2023
1 parent 4121b1b commit bb47b3e
Show file tree
Hide file tree
Showing 32 changed files with 206 additions and 319 deletions.
14 changes: 6 additions & 8 deletions src/main/java/com/jd/jdbc/VSchemaManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import vschema.Vschema;

Expand Down Expand Up @@ -58,16 +57,15 @@ public static VSchemaManager getInstance(TopoServer topoServer) {
}

/**
* @param ksSet
*
* @param tabletKeyspace
* @throws TopoException
*/
public void initVschema(Set<String> ksSet) throws TopoException {
for (String ks : ksSet) {
if (this.ksMap.containsKey(ks)) {
continue;
}
this.ksMap.put(ks, this.topoServer.getVschema(null, ks));
public void initVschema(String tabletKeyspace) throws TopoException {
if (this.ksMap.containsKey(tabletKeyspace)) {
return;
}
this.ksMap.put(tabletKeyspace, this.topoServer.getVschema(null, tabletKeyspace));
}

/**
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/com/jd/jdbc/discovery/HealthCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.jd.jdbc.sqlparser.support.logging.Log;
import com.jd.jdbc.sqlparser.support.logging.LogFactory;
import com.jd.jdbc.topo.topoproto.TopoProto;
import com.jd.jdbc.util.SchemaUtil;
import com.jd.jdbc.util.threadpool.impl.VtHealthCheckExecutorService;
import io.netty.util.internal.StringUtil;
import io.vitess.proto.Query;
Expand Down Expand Up @@ -122,6 +121,10 @@ public static String keyFromTablet(final Topodata.Tablet tablet) {
return tablet.getKeyspace() + "." + tablet.getShard() + "." + TopoProto.tabletTypeLstring(tablet.getType());
}

public static int getSecondsBehindMaster() {
return HealthCheck.secondsBehindsMaster;
}

public Map<String, TabletHealthCheck> getHealthByAliasCopy() {
return new HashMap<>(healthByAlias);
}
Expand Down Expand Up @@ -411,10 +414,9 @@ public void updateHealth(final TabletHealthCheck th, final Query.Target preTarge
}

public void initConnectionPool(final String keyspace) {
String tabletKeyspace = SchemaUtil.getLogicSchema(keyspace);
for (TabletHealthCheck tabletHealthCheck : healthByAlias.values()) {
Topodata.Tablet tablet = tabletHealthCheck.getTablet();
if (tablet == null || !tablet.getKeyspace().equalsIgnoreCase(tabletKeyspace) || !tabletHealthCheck.getServing().get()) {
if (tablet == null || !tablet.getKeyspace().equalsIgnoreCase(keyspace) || !tabletHealthCheck.getServing().get()) {
continue;
}
if (!Objects.equals(Topodata.TabletType.MASTER, tablet.getType())) {
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/com/jd/jdbc/discovery/SecurityCenter.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import lombok.Getter;

public enum SecurityCenter {
/**
* Enum singleton
*/
INSTANCE;

private Map<String, Credential> keySpaceCredentialMap = null;
Expand Down
79 changes: 43 additions & 36 deletions src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import com.jd.jdbc.monitor.TopologyCollector;
import com.jd.jdbc.sqlparser.support.logging.Log;
import com.jd.jdbc.sqlparser.support.logging.LogFactory;
import com.jd.jdbc.sqlparser.utils.StringUtils;
import com.jd.jdbc.topo.TopoException;
import com.jd.jdbc.topo.TopoExceptionCode;
import com.jd.jdbc.topo.TopoServer;
import com.jd.jdbc.topo.TopoTabletInfo;
import com.jd.jdbc.topo.topoproto.TopoProto;
import com.jd.jdbc.util.threadpool.impl.VtHealthCheckExecutorService;
import io.vitess.proto.Topodata;
Expand Down Expand Up @@ -68,11 +68,11 @@ public class TopologyWatcher {

private Map<String, Topodata.Tablet> currentTablets = new ConcurrentHashMap<>(16);

public TopologyWatcher(TopoServer ts, String cell, Set<String> keyspaces) {
public TopologyWatcher(TopoServer ts, String cell, String tabletKeyspace) {
this.ts = ts;
this.hc = HealthCheck.INSTANCE;
this.cell = cell;
this.ksSet.addAll(keyspaces);
this.ksSet.add(tabletKeyspace);
this.firstLoadTabletsFlag = true;
log.info("start topo watcher for cell: " + cell);
}
Expand Down Expand Up @@ -122,11 +122,12 @@ private void connectTablets(Map<String, Topodata.Tablet> newTablets) {
private Map<String, Topodata.Tablet> getTopoTabletInfoMap(IContext ctx) {
Map<String, Topodata.Tablet> newTablets;
if (firstLoadTabletsFlag) {
List<TopoTabletInfo> topoTabletInfoList;
List<Topodata.Tablet> tabletList;
try {
topoTabletInfoList = ts.getTabletsByRange(ctx, cell);
tabletList = ts.getTabletsByRange(ctx, cell);
} catch (TopoException e) {
if (TopoException.isErrType(e, TopoExceptionCode.NO_NODE)) {
log.warn("getTabletsByRange error,cause by" + e.getMessage());
return null;
}
log.error("get topoTabletInfo fail", e);
Expand All @@ -136,12 +137,15 @@ private Map<String, Topodata.Tablet> getTopoTabletInfoMap(IContext ctx) {
return null;
}
newTablets = new HashMap<>();
for (TopoTabletInfo topoTabletInfo : topoTabletInfoList) {
if (!ksSet.contains(topoTabletInfo.getTablet().getKeyspace())) {
ignoreTopo.ignore(topoTabletInfo.getTablet().getKeyspace(), topoTabletInfo.getTablet());
for (Topodata.Tablet tablet : tabletList) {
if (StringUtils.isEmpty(tablet.getKeyspace())) {
continue;
}
newTablets.put(TopoProto.tabletAliasString(topoTabletInfo.getTablet().getAlias()), topoTabletInfo.getTablet());
if (!ksSet.contains(tablet.getKeyspace())) {
ignoreTopo.ignore(tablet.getKeyspace(), tablet);
continue;
}
newTablets.put(TopoProto.tabletAliasString(tablet.getAlias()), tablet);
}
firstLoadTabletsFlag = false;
return newTablets;
Expand All @@ -151,6 +155,10 @@ private Map<String, Topodata.Tablet> getTopoTabletInfoMap(IContext ctx) {
try {
tablets = ts.getTabletAliasByCell(ctx, cell);
} catch (TopoException e) {
if (TopoException.isErrType(e, TopoExceptionCode.NO_NODE)) {
log.warn("getTabletAliasByCell error,cause by" + e.getMessage());
return null;
}
log.error("Build Tablets fail,current ksSet=" + ksSet, e);
TopologyCollector.getErrorCounter().labels(cell).inc();
return null;
Expand Down Expand Up @@ -180,14 +188,18 @@ private Map<String, Topodata.Tablet> getTopoTabletInfoMap(IContext ctx) {
for (Topodata.TabletAlias alias : tablets) {
VtHealthCheckExecutorService.execute(() -> {
try {
TopoTabletInfo topoTabletInfo = ts.getTablet(ctx, alias);
if (!ksSet.contains(topoTabletInfo.getTablet().getKeyspace())) {
ignoreTopo.ignore(topoTabletInfo.getTablet().getKeyspace(), topoTabletInfo.getTablet());
Topodata.Tablet tablet = ts.getTablet(ctx, alias);
if (StringUtils.isEmpty(tablet.getKeyspace())) {
return;
}
if (!ksSet.contains(tablet.getKeyspace())) {
ignoreTopo.ignore(tablet.getKeyspace(), tablet);
return;
}
newTablets.put(TopoProto.tabletAliasString(alias), topoTabletInfo.getTablet());
newTablets.put(TopoProto.tabletAliasString(alias), tablet);
} catch (TopoException e) {
if (TopoException.isErrType(e, TopoExceptionCode.NO_NODE)) {
log.warn("getTablet error,cause by" + e.getMessage());
return;
}
log.error("get topoTabletInfo fail", e);
Expand Down Expand Up @@ -218,31 +230,26 @@ public void run() {
}, 0, 30000);
}

public void watchKeyspace(IContext ctx, Set<String> keyspace) {
if (ksSet.containsAll(keyspace)) {
public void watchKeyspace(IContext ctx, String tabletKeyspace) {
if (ksSet.contains(tabletKeyspace)) {
return;
}
for (String ks : keyspace) {
if (this.ksSet.contains(ks)) {
continue;
}
log.info("topo watcher for cell " + cell + " watches: " + ks);
this.ksSet.add(ks);
Set<Topodata.Tablet> tablets = this.ignoreTopo.watchKs(ks);
if (CollectionUtils.isEmpty(tablets)) {
return;
}
for (Topodata.Tablet tablet : tablets) {
hc.addTablet(tablet);
}
this.lock.lock();
try {
Map<String, Topodata.Tablet> newTablets = tablets.stream()
.collect(Collectors.toMap(a -> TopoProto.tabletAliasString(a.getAlias()), s -> s, (s1, s2) -> s1));
currentTablets.putAll(newTablets);
} finally {
this.lock.unlock();
}
log.info("topo watcher for cell " + cell + " watches: " + tabletKeyspace);
this.ksSet.add(tabletKeyspace);
Set<Topodata.Tablet> tablets = this.ignoreTopo.watchKs(tabletKeyspace);
if (CollectionUtils.isEmpty(tablets)) {
return;
}
for (Topodata.Tablet tablet : tablets) {
hc.addTablet(tablet);
}
this.lock.lock();
try {
Map<String, Topodata.Tablet> newTablets = tablets.stream()
.collect(Collectors.toMap(a -> TopoProto.tabletAliasString(a.getAlias()), s -> s, (s1, s2) -> s1));
currentTablets.putAll(newTablets);
} finally {
this.lock.unlock();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.jd.jdbc.context.IContext;
import com.jd.jdbc.topo.TopoServer;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -40,11 +39,11 @@ public enum TopologyWatcherManager {
cellTopologyWatcherMap = new ConcurrentHashMap<>(16);
}

public void startWatch(IContext ctx, TopoServer topoServer, String cell, Set<String> keySpaces) {
public void startWatch(IContext ctx, TopoServer topoServer, String cell, String tabletKeyspace) {
lock.lock();
try {
if (!cellTopologyWatcherMap.containsKey(cell)) {
TopologyWatcher topologyWatcher = new TopologyWatcher(topoServer, cell, keySpaces);
TopologyWatcher topologyWatcher = new TopologyWatcher(topoServer, cell, tabletKeyspace);
topologyWatcher.start(ctx);
cellTopologyWatcherMap.put(cell, topologyWatcher);
}
Expand All @@ -53,11 +52,11 @@ public void startWatch(IContext ctx, TopoServer topoServer, String cell, Set<Str
}
}

public void watch(IContext ctx, String cell, Set<String> keySpaces) {
public void watch(IContext ctx, String cell, String tabletKeyspace) {
if (!cellTopologyWatcherMap.containsKey(cell)) {
throw new RuntimeException("topo watcher for cell " + cell + " is not started");
}
cellTopologyWatcherMap.get(cell).watchKeyspace(ctx, keySpaces);
cellTopologyWatcherMap.get(cell).watchKeyspace(ctx, tabletKeyspace);
}

public void close() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/jd/jdbc/pool/HikariUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import com.jd.jdbc.sqlparser.support.logging.Log;
import com.jd.jdbc.sqlparser.support.logging.LogFactory;
import com.jd.jdbc.topo.topoproto.TopoProto;
import com.jd.jdbc.util.SchemaUtil;
import com.jd.jdbc.util.KeyspaceUtil;
import com.mysql.cj.jdbc.ConnectionImpl;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.metrics.MetricsTrackerFactory;
Expand Down Expand Up @@ -70,7 +70,7 @@ public static HikariConfig getHikariConfig(Topodata.Tablet tablet, String user,

hikariConfig.setDataSourceProperties(dsProperties);

String realSchema = SchemaUtil.getRealSchema(tablet.getKeyspace());
String realSchema = KeyspaceUtil.getRealSchema(tablet.getKeyspace());
hikariConfig.setDriverClassName(Constant.MYSQL_PROTOCOL_DRIVER_CLASS);
String nativeUrl = "jdbc:mysql://" + tablet.getMysqlHostname() + ":" + tablet.getMysqlPort() + "/" + realSchema;
hikariConfig.setJdbcUrl(nativeUrl);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/jd/jdbc/pool/StatefulConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@

package com.jd.jdbc.pool;

import com.jd.jdbc.common.util.MapUtil;
import com.jd.jdbc.discovery.HealthCheck;
import com.jd.jdbc.discovery.SecurityCenter;
import com.jd.jdbc.sqlparser.support.logging.Log;
import com.jd.jdbc.sqlparser.support.logging.LogFactory;
import com.jd.jdbc.sqlparser.utils.StringUtils;
import com.jd.jdbc.topo.topoproto.TopoProto;
import com.jd.jdbc.common.util.MapUtil;
import com.jd.jdbc.vitess.Config;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.pool.HikariPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.jd.jdbc.srvtopo.BindVariable;
import com.jd.jdbc.srvtopo.BoundQuery;
import com.jd.jdbc.vitess.Config;
import com.jd.jdbc.vitess.VitessJdbcProperyUtil;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import io.vitess.proto.Query;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import com.jd.jdbc.srvtopo.BindVariable;
import com.jd.jdbc.srvtopo.BoundQuery;
import com.jd.jdbc.topo.topoproto.TopoProto;
import com.jd.jdbc.util.SchemaUtil;
import com.jd.jdbc.util.KeyspaceUtil;
import com.jd.jdbc.vitess.mysql.VitessPropertyKey;
import io.prometheus.client.Histogram;
import io.vitess.proto.Query;
Expand Down Expand Up @@ -451,7 +451,7 @@ private VtResultSet toVtResultSet(final boolean isQuery, final Statement stateme
Query.Field.Builder fieldBuilder = Query.Field.newBuilder();
Query.Type queryType = VtType.getQueryType(metaData.getColumnTypeName(col));
columnClassNames.add(metaData.getColumnClassName(col));
fieldBuilder.setDatabase(SchemaUtil.getLogicSchema(metaData.getCatalogName(col)))
fieldBuilder.setDatabase(KeyspaceUtil.getLogicSchema(metaData.getCatalogName(col)))
.setTable(metaData.getTableName(col))
.setName(metaData.getColumnLabel(col))
.setOrgName(metaData.getColumnName(col))
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/jd/jdbc/queryservice/StreamIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import com.jd.jdbc.sqltypes.VtResultSet;
import com.jd.jdbc.sqltypes.VtResultValue;
import com.jd.jdbc.sqltypes.VtType;
import com.jd.jdbc.util.SchemaUtil;
import com.jd.jdbc.util.KeyspaceUtil;
import io.vitess.proto.Query;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
Expand Down Expand Up @@ -57,7 +57,7 @@ public boolean hasNext() throws SQLException {
for (int idx = 0, col = 1; idx < cols; idx++, col++) {
Query.Field.Builder fieldBuilder = Query.Field.newBuilder();
Query.Type queryType = VtType.getQueryType(metaData.getColumnTypeName(col));
fieldBuilder.setDatabase(SchemaUtil.getLogicSchema(metaData.getCatalogName(col)))
fieldBuilder.setDatabase(KeyspaceUtil.getLogicSchema(metaData.getCatalogName(col)))
.setJdbcClassName(metaData.getColumnClassName(col))
.setPrecision(metaData.getPrecision(col))
.setIsSigned(metaData.isSigned(col))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import com.jd.jdbc.sqlparser.dialect.mysql.ast.statement.MySqlInsertStatement;
import com.jd.jdbc.sqlparser.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
import com.jd.jdbc.sqlparser.dialect.mysql.ast.statement.MySqlUpdateStatement;
import com.jd.jdbc.util.SchemaUtil;
import com.jd.jdbc.util.KeyspaceUtil;
import java.util.List;


Expand Down Expand Up @@ -601,19 +601,19 @@ private void visitArgumentList(final List<SQLExpr> exprList) {
private void replaceSchema(final SQLPropertyExpr tableExpr) {
String keyspaceInSql = tableExpr.getOwnernName();
if (this.defaultKeyspace.equalsIgnoreCase(keyspaceInSql)) {
this.newDefaultKeyspace = SchemaUtil.getRealSchema(this.defaultKeyspace);
this.newDefaultKeyspace = KeyspaceUtil.getRealSchema(this.defaultKeyspace);
} else {
this.newDefaultKeyspace = SchemaUtil.getRealSchema(keyspaceInSql);
this.newDefaultKeyspace = KeyspaceUtil.getRealSchema(keyspaceInSql);
}
this.isReplace = true;
tableExpr.setOwner(this.newDefaultKeyspace);
}

public String getNewDefaultKeyspace() {
if (this.isReplace) {
return SchemaUtil.getLogicSchema(this.newDefaultKeyspace);
return KeyspaceUtil.getLogicSchema(this.newDefaultKeyspace);
}
this.newDefaultKeyspace = SchemaUtil.getRealSchema(this.defaultKeyspace);
return SchemaUtil.getLogicSchema(this.newDefaultKeyspace);
this.newDefaultKeyspace = KeyspaceUtil.getRealSchema(this.defaultKeyspace);
return KeyspaceUtil.getLogicSchema(this.newDefaultKeyspace);
}
}
5 changes: 2 additions & 3 deletions src/main/java/com/jd/jdbc/srvtopo/Gateway.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.jd.jdbc.queryservice.IQueryService;
import io.vitess.proto.Topodata;
import java.util.List;
import java.util.Set;
import lombok.Getter;
import lombok.Setter;

Expand Down Expand Up @@ -59,9 +58,9 @@ public abstract class Gateway {
*
* @param ctx
* @param cell
* @param keyspaceNameList
* @param keyspace
* @param tabletTypeList
* @throws Exception
*/
public abstract void waitForTablets(IContext ctx, String cell, Set<String> keyspaceNameList, List<Topodata.TabletType> tabletTypeList) throws Exception;
public abstract void waitForTablets(IContext ctx, String cell, String keyspace, List<Topodata.TabletType> tabletTypeList) throws Exception;
}
4 changes: 4 additions & 0 deletions src/main/java/com/jd/jdbc/srvtopo/ResilientServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,10 @@ public void run() {
CurrentShard.setShardReferences(keyspace, shardReferencesList);
SrvKeyspaceCollector.getSrvKeyspaceTaskCounter().labels(keyspace, cell).inc();
} catch (TopoException e) {
if (TopoException.isErrType(e, TopoExceptionCode.NO_NODE)) {
log.warn("getSrvKeyspace error,cause by" + e.getMessage());
continue;
}
log.error("srvKeyspace-Timer getSrvKeyspace failed for " + cell + "/" + keyspace + ": " + e.getMessage());
SrvKeyspaceCollector.getSrvKeyspaceTaskErrorCounter().labels(keyspace, cell).inc();
} catch (Exception e) {
Expand Down
Loading

0 comments on commit bb47b3e

Please sign in to comment.