From bb47b3e907c406acc0e7a8c59d36905bd8ac16e2 Mon Sep 17 00:00:00 2001 From: wlx5575 Date: Tue, 18 Jul 2023 18:24:06 +0800 Subject: [PATCH] Simplified error information when the metadata directory does not exist, suitable for the cell offline scenario --- src/main/java/com/jd/jdbc/VSchemaManager.java | 14 +-- .../com/jd/jdbc/discovery/HealthCheck.java | 8 +- .../com/jd/jdbc/discovery/SecurityCenter.java | 3 + .../jd/jdbc/discovery/TopologyWatcher.java | 79 ++++++------ .../discovery/TopologyWatcherManager.java | 9 +- .../java/com/jd/jdbc/pool/HikariUtil.java | 4 +- .../jd/jdbc/pool/StatefulConnectionPool.java | 2 +- .../queryservice/CombinedQueryService.java | 1 - .../jdbc/queryservice/NativeQueryService.java | 4 +- .../jd/jdbc/queryservice/StreamIterator.java | 4 +- .../mysql/visitor/VtChangeSchemaVisitor.java | 12 +- .../java/com/jd/jdbc/srvtopo/Gateway.java | 5 +- .../com/jd/jdbc/srvtopo/ResilientServer.java | 4 + .../java/com/jd/jdbc/srvtopo/SrvTopo.java | 116 +++++++----------- .../com/jd/jdbc/srvtopo/TabletGateway.java | 35 ++---- .../java/com/jd/jdbc/topo/TopoServer.java | 33 ++--- .../java/com/jd/jdbc/topo/TopoTablet.java | 6 +- .../java/com/jd/jdbc/topo/TopoTabletInfo.java | 57 --------- .../jdbc/topo/etcd2topo/Etcd2TopoServer.java | 3 - .../{SchemaUtil.java => KeyspaceUtil.java} | 7 +- .../com/jd/jdbc/vitess/VitessConnection.java | 11 +- .../java/com/jd/jdbc/vitess/VitessDriver.java | 16 +-- .../metadata/VitessDatabaseMetaData.java | 6 +- .../resultset/DatabaseMetaDataResultSet.java | 4 +- .../jd/jdbc/discovery/HealthCheckTest.java | 8 +- .../com/jd/jdbc/key/BinaryHashUtilTest.java | 22 +++- .../jd/jdbc/sqlparser/ChangeSchemaTest.java | 11 +- .../jd/jdbc/topo/etcd2topo/ServerTest.java | 10 +- .../jd/jdbc/util/InnerConnectionPoolUtil.java | 2 +- src/test/java/testsuite/TestSuite.java | 19 ++- .../internal/environment/DriverEnv.java | 3 +- .../internal/environment/TestSuiteEnv.java | 7 +- 32 files changed, 206 insertions(+), 319 deletions(-) delete mode 100644 src/main/java/com/jd/jdbc/topo/TopoTabletInfo.java rename src/main/java/com/jd/jdbc/util/{SchemaUtil.java => KeyspaceUtil.java} (97%) diff --git a/src/main/java/com/jd/jdbc/VSchemaManager.java b/src/main/java/com/jd/jdbc/VSchemaManager.java index 16c9db4..8ea8161 100644 --- a/src/main/java/com/jd/jdbc/VSchemaManager.java +++ b/src/main/java/com/jd/jdbc/VSchemaManager.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import vschema.Vschema; @@ -58,16 +57,15 @@ public static VSchemaManager getInstance(TopoServer topoServer) { } /** - * @param ksSet + * + * @param tabletKeyspace * @throws TopoException */ - public void initVschema(Set ksSet) throws TopoException { - for (String ks : ksSet) { - if (this.ksMap.containsKey(ks)) { - continue; - } - this.ksMap.put(ks, this.topoServer.getVschema(null, ks)); + public void initVschema(String tabletKeyspace) throws TopoException { + if (this.ksMap.containsKey(tabletKeyspace)) { + return; } + this.ksMap.put(tabletKeyspace, this.topoServer.getVschema(null, tabletKeyspace)); } /** diff --git a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java index 53f8524..5e110e2 100644 --- a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java +++ b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java @@ -26,7 +26,6 @@ import com.jd.jdbc.sqlparser.support.logging.Log; import com.jd.jdbc.sqlparser.support.logging.LogFactory; import com.jd.jdbc.topo.topoproto.TopoProto; -import com.jd.jdbc.util.SchemaUtil; import com.jd.jdbc.util.threadpool.impl.VtHealthCheckExecutorService; import io.netty.util.internal.StringUtil; import io.vitess.proto.Query; @@ -122,6 +121,10 @@ public static String keyFromTablet(final Topodata.Tablet tablet) { return tablet.getKeyspace() + "." + tablet.getShard() + "." + TopoProto.tabletTypeLstring(tablet.getType()); } + public static int getSecondsBehindMaster() { + return HealthCheck.secondsBehindsMaster; + } + public Map getHealthByAliasCopy() { return new HashMap<>(healthByAlias); } @@ -411,10 +414,9 @@ public void updateHealth(final TabletHealthCheck th, final Query.Target preTarge } public void initConnectionPool(final String keyspace) { - String tabletKeyspace = SchemaUtil.getLogicSchema(keyspace); for (TabletHealthCheck tabletHealthCheck : healthByAlias.values()) { Topodata.Tablet tablet = tabletHealthCheck.getTablet(); - if (tablet == null || !tablet.getKeyspace().equalsIgnoreCase(tabletKeyspace) || !tabletHealthCheck.getServing().get()) { + if (tablet == null || !tablet.getKeyspace().equalsIgnoreCase(keyspace) || !tabletHealthCheck.getServing().get()) { continue; } if (!Objects.equals(Topodata.TabletType.MASTER, tablet.getType())) { diff --git a/src/main/java/com/jd/jdbc/discovery/SecurityCenter.java b/src/main/java/com/jd/jdbc/discovery/SecurityCenter.java index 043d695..5c6b19e 100644 --- a/src/main/java/com/jd/jdbc/discovery/SecurityCenter.java +++ b/src/main/java/com/jd/jdbc/discovery/SecurityCenter.java @@ -27,6 +27,9 @@ import lombok.Getter; public enum SecurityCenter { + /** + * Enum singleton + */ INSTANCE; private Map keySpaceCredentialMap = null; diff --git a/src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java b/src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java index 94a5c7d..be58d52 100644 --- a/src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java +++ b/src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java @@ -24,10 +24,10 @@ import com.jd.jdbc.monitor.TopologyCollector; import com.jd.jdbc.sqlparser.support.logging.Log; import com.jd.jdbc.sqlparser.support.logging.LogFactory; +import com.jd.jdbc.sqlparser.utils.StringUtils; import com.jd.jdbc.topo.TopoException; import com.jd.jdbc.topo.TopoExceptionCode; import com.jd.jdbc.topo.TopoServer; -import com.jd.jdbc.topo.TopoTabletInfo; import com.jd.jdbc.topo.topoproto.TopoProto; import com.jd.jdbc.util.threadpool.impl.VtHealthCheckExecutorService; import io.vitess.proto.Topodata; @@ -68,11 +68,11 @@ public class TopologyWatcher { private Map currentTablets = new ConcurrentHashMap<>(16); - public TopologyWatcher(TopoServer ts, String cell, Set keyspaces) { + public TopologyWatcher(TopoServer ts, String cell, String tabletKeyspace) { this.ts = ts; this.hc = HealthCheck.INSTANCE; this.cell = cell; - this.ksSet.addAll(keyspaces); + this.ksSet.add(tabletKeyspace); this.firstLoadTabletsFlag = true; log.info("start topo watcher for cell: " + cell); } @@ -122,11 +122,12 @@ private void connectTablets(Map newTablets) { private Map getTopoTabletInfoMap(IContext ctx) { Map newTablets; if (firstLoadTabletsFlag) { - List topoTabletInfoList; + List tabletList; try { - topoTabletInfoList = ts.getTabletsByRange(ctx, cell); + tabletList = ts.getTabletsByRange(ctx, cell); } catch (TopoException e) { if (TopoException.isErrType(e, TopoExceptionCode.NO_NODE)) { + log.warn("getTabletsByRange error,cause by" + e.getMessage()); return null; } log.error("get topoTabletInfo fail", e); @@ -136,12 +137,15 @@ private Map getTopoTabletInfoMap(IContext ctx) { return null; } newTablets = new HashMap<>(); - for (TopoTabletInfo topoTabletInfo : topoTabletInfoList) { - if (!ksSet.contains(topoTabletInfo.getTablet().getKeyspace())) { - ignoreTopo.ignore(topoTabletInfo.getTablet().getKeyspace(), topoTabletInfo.getTablet()); + for (Topodata.Tablet tablet : tabletList) { + if (StringUtils.isEmpty(tablet.getKeyspace())) { continue; } - newTablets.put(TopoProto.tabletAliasString(topoTabletInfo.getTablet().getAlias()), topoTabletInfo.getTablet()); + if (!ksSet.contains(tablet.getKeyspace())) { + ignoreTopo.ignore(tablet.getKeyspace(), tablet); + continue; + } + newTablets.put(TopoProto.tabletAliasString(tablet.getAlias()), tablet); } firstLoadTabletsFlag = false; return newTablets; @@ -151,6 +155,10 @@ private Map getTopoTabletInfoMap(IContext ctx) { try { tablets = ts.getTabletAliasByCell(ctx, cell); } catch (TopoException e) { + if (TopoException.isErrType(e, TopoExceptionCode.NO_NODE)) { + log.warn("getTabletAliasByCell error,cause by" + e.getMessage()); + return null; + } log.error("Build Tablets fail,current ksSet=" + ksSet, e); TopologyCollector.getErrorCounter().labels(cell).inc(); return null; @@ -180,14 +188,18 @@ private Map getTopoTabletInfoMap(IContext ctx) { for (Topodata.TabletAlias alias : tablets) { VtHealthCheckExecutorService.execute(() -> { try { - TopoTabletInfo topoTabletInfo = ts.getTablet(ctx, alias); - if (!ksSet.contains(topoTabletInfo.getTablet().getKeyspace())) { - ignoreTopo.ignore(topoTabletInfo.getTablet().getKeyspace(), topoTabletInfo.getTablet()); + Topodata.Tablet tablet = ts.getTablet(ctx, alias); + if (StringUtils.isEmpty(tablet.getKeyspace())) { + return; + } + if (!ksSet.contains(tablet.getKeyspace())) { + ignoreTopo.ignore(tablet.getKeyspace(), tablet); return; } - newTablets.put(TopoProto.tabletAliasString(alias), topoTabletInfo.getTablet()); + newTablets.put(TopoProto.tabletAliasString(alias), tablet); } catch (TopoException e) { if (TopoException.isErrType(e, TopoExceptionCode.NO_NODE)) { + log.warn("getTablet error,cause by" + e.getMessage()); return; } log.error("get topoTabletInfo fail", e); @@ -218,31 +230,26 @@ public void run() { }, 0, 30000); } - public void watchKeyspace(IContext ctx, Set keyspace) { - if (ksSet.containsAll(keyspace)) { + public void watchKeyspace(IContext ctx, String tabletKeyspace) { + if (ksSet.contains(tabletKeyspace)) { return; } - for (String ks : keyspace) { - if (this.ksSet.contains(ks)) { - continue; - } - log.info("topo watcher for cell " + cell + " watches: " + ks); - this.ksSet.add(ks); - Set tablets = this.ignoreTopo.watchKs(ks); - if (CollectionUtils.isEmpty(tablets)) { - return; - } - for (Topodata.Tablet tablet : tablets) { - hc.addTablet(tablet); - } - this.lock.lock(); - try { - Map newTablets = tablets.stream() - .collect(Collectors.toMap(a -> TopoProto.tabletAliasString(a.getAlias()), s -> s, (s1, s2) -> s1)); - currentTablets.putAll(newTablets); - } finally { - this.lock.unlock(); - } + log.info("topo watcher for cell " + cell + " watches: " + tabletKeyspace); + this.ksSet.add(tabletKeyspace); + Set tablets = this.ignoreTopo.watchKs(tabletKeyspace); + if (CollectionUtils.isEmpty(tablets)) { + return; + } + for (Topodata.Tablet tablet : tablets) { + hc.addTablet(tablet); + } + this.lock.lock(); + try { + Map newTablets = tablets.stream() + .collect(Collectors.toMap(a -> TopoProto.tabletAliasString(a.getAlias()), s -> s, (s1, s2) -> s1)); + currentTablets.putAll(newTablets); + } finally { + this.lock.unlock(); } } diff --git a/src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java b/src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java index be2ac77..bcff6d5 100644 --- a/src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java +++ b/src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java @@ -21,7 +21,6 @@ import com.jd.jdbc.context.IContext; import com.jd.jdbc.topo.TopoServer; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -40,11 +39,11 @@ public enum TopologyWatcherManager { cellTopologyWatcherMap = new ConcurrentHashMap<>(16); } - public void startWatch(IContext ctx, TopoServer topoServer, String cell, Set keySpaces) { + public void startWatch(IContext ctx, TopoServer topoServer, String cell, String tabletKeyspace) { lock.lock(); try { if (!cellTopologyWatcherMap.containsKey(cell)) { - TopologyWatcher topologyWatcher = new TopologyWatcher(topoServer, cell, keySpaces); + TopologyWatcher topologyWatcher = new TopologyWatcher(topoServer, cell, tabletKeyspace); topologyWatcher.start(ctx); cellTopologyWatcherMap.put(cell, topologyWatcher); } @@ -53,11 +52,11 @@ public void startWatch(IContext ctx, TopoServer topoServer, String cell, Set keySpaces) { + public void watch(IContext ctx, String cell, String tabletKeyspace) { if (!cellTopologyWatcherMap.containsKey(cell)) { throw new RuntimeException("topo watcher for cell " + cell + " is not started"); } - cellTopologyWatcherMap.get(cell).watchKeyspace(ctx, keySpaces); + cellTopologyWatcherMap.get(cell).watchKeyspace(ctx, tabletKeyspace); } public void close() { diff --git a/src/main/java/com/jd/jdbc/pool/HikariUtil.java b/src/main/java/com/jd/jdbc/pool/HikariUtil.java index d175d51..32a71e3 100644 --- a/src/main/java/com/jd/jdbc/pool/HikariUtil.java +++ b/src/main/java/com/jd/jdbc/pool/HikariUtil.java @@ -22,7 +22,7 @@ import com.jd.jdbc.sqlparser.support.logging.Log; import com.jd.jdbc.sqlparser.support.logging.LogFactory; import com.jd.jdbc.topo.topoproto.TopoProto; -import com.jd.jdbc.util.SchemaUtil; +import com.jd.jdbc.util.KeyspaceUtil; import com.mysql.cj.jdbc.ConnectionImpl; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.metrics.MetricsTrackerFactory; @@ -70,7 +70,7 @@ public static HikariConfig getHikariConfig(Topodata.Tablet tablet, String user, hikariConfig.setDataSourceProperties(dsProperties); - String realSchema = SchemaUtil.getRealSchema(tablet.getKeyspace()); + String realSchema = KeyspaceUtil.getRealSchema(tablet.getKeyspace()); hikariConfig.setDriverClassName(Constant.MYSQL_PROTOCOL_DRIVER_CLASS); String nativeUrl = "jdbc:mysql://" + tablet.getMysqlHostname() + ":" + tablet.getMysqlPort() + "/" + realSchema; hikariConfig.setJdbcUrl(nativeUrl); diff --git a/src/main/java/com/jd/jdbc/pool/StatefulConnectionPool.java b/src/main/java/com/jd/jdbc/pool/StatefulConnectionPool.java index 1c6a17e..e9848be 100644 --- a/src/main/java/com/jd/jdbc/pool/StatefulConnectionPool.java +++ b/src/main/java/com/jd/jdbc/pool/StatefulConnectionPool.java @@ -18,13 +18,13 @@ package com.jd.jdbc.pool; +import com.jd.jdbc.common.util.MapUtil; import com.jd.jdbc.discovery.HealthCheck; import com.jd.jdbc.discovery.SecurityCenter; import com.jd.jdbc.sqlparser.support.logging.Log; import com.jd.jdbc.sqlparser.support.logging.LogFactory; import com.jd.jdbc.sqlparser.utils.StringUtils; import com.jd.jdbc.topo.topoproto.TopoProto; -import com.jd.jdbc.common.util.MapUtil; import com.jd.jdbc.vitess.Config; import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.pool.HikariPool; diff --git a/src/main/java/com/jd/jdbc/queryservice/CombinedQueryService.java b/src/main/java/com/jd/jdbc/queryservice/CombinedQueryService.java index 63c1e23..aa89676 100644 --- a/src/main/java/com/jd/jdbc/queryservice/CombinedQueryService.java +++ b/src/main/java/com/jd/jdbc/queryservice/CombinedQueryService.java @@ -28,7 +28,6 @@ import com.jd.jdbc.srvtopo.BindVariable; import com.jd.jdbc.srvtopo.BoundQuery; import com.jd.jdbc.vitess.Config; -import com.jd.jdbc.vitess.VitessJdbcProperyUtil; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; import io.vitess.proto.Query; diff --git a/src/main/java/com/jd/jdbc/queryservice/NativeQueryService.java b/src/main/java/com/jd/jdbc/queryservice/NativeQueryService.java index f9b584d..f3ddb85 100644 --- a/src/main/java/com/jd/jdbc/queryservice/NativeQueryService.java +++ b/src/main/java/com/jd/jdbc/queryservice/NativeQueryService.java @@ -38,7 +38,7 @@ import com.jd.jdbc.srvtopo.BindVariable; import com.jd.jdbc.srvtopo.BoundQuery; import com.jd.jdbc.topo.topoproto.TopoProto; -import com.jd.jdbc.util.SchemaUtil; +import com.jd.jdbc.util.KeyspaceUtil; import com.jd.jdbc.vitess.mysql.VitessPropertyKey; import io.prometheus.client.Histogram; import io.vitess.proto.Query; @@ -451,7 +451,7 @@ private VtResultSet toVtResultSet(final boolean isQuery, final Statement stateme Query.Field.Builder fieldBuilder = Query.Field.newBuilder(); Query.Type queryType = VtType.getQueryType(metaData.getColumnTypeName(col)); columnClassNames.add(metaData.getColumnClassName(col)); - fieldBuilder.setDatabase(SchemaUtil.getLogicSchema(metaData.getCatalogName(col))) + fieldBuilder.setDatabase(KeyspaceUtil.getLogicSchema(metaData.getCatalogName(col))) .setTable(metaData.getTableName(col)) .setName(metaData.getColumnLabel(col)) .setOrgName(metaData.getColumnName(col)) diff --git a/src/main/java/com/jd/jdbc/queryservice/StreamIterator.java b/src/main/java/com/jd/jdbc/queryservice/StreamIterator.java index 565f518..a01f342 100644 --- a/src/main/java/com/jd/jdbc/queryservice/StreamIterator.java +++ b/src/main/java/com/jd/jdbc/queryservice/StreamIterator.java @@ -23,7 +23,7 @@ import com.jd.jdbc.sqltypes.VtResultSet; import com.jd.jdbc.sqltypes.VtResultValue; import com.jd.jdbc.sqltypes.VtType; -import com.jd.jdbc.util.SchemaUtil; +import com.jd.jdbc.util.KeyspaceUtil; import io.vitess.proto.Query; import java.sql.ResultSet; import java.sql.ResultSetMetaData; @@ -57,7 +57,7 @@ public boolean hasNext() throws SQLException { for (int idx = 0, col = 1; idx < cols; idx++, col++) { Query.Field.Builder fieldBuilder = Query.Field.newBuilder(); Query.Type queryType = VtType.getQueryType(metaData.getColumnTypeName(col)); - fieldBuilder.setDatabase(SchemaUtil.getLogicSchema(metaData.getCatalogName(col))) + fieldBuilder.setDatabase(KeyspaceUtil.getLogicSchema(metaData.getCatalogName(col))) .setJdbcClassName(metaData.getColumnClassName(col)) .setPrecision(metaData.getPrecision(col)) .setIsSigned(metaData.isSigned(col)) diff --git a/src/main/java/com/jd/jdbc/sqlparser/dialect/mysql/visitor/VtChangeSchemaVisitor.java b/src/main/java/com/jd/jdbc/sqlparser/dialect/mysql/visitor/VtChangeSchemaVisitor.java index 45ac006..3eb08f9 100644 --- a/src/main/java/com/jd/jdbc/sqlparser/dialect/mysql/visitor/VtChangeSchemaVisitor.java +++ b/src/main/java/com/jd/jdbc/sqlparser/dialect/mysql/visitor/VtChangeSchemaVisitor.java @@ -43,7 +43,7 @@ import com.jd.jdbc.sqlparser.dialect.mysql.ast.statement.MySqlInsertStatement; import com.jd.jdbc.sqlparser.dialect.mysql.ast.statement.MySqlSelectQueryBlock; import com.jd.jdbc.sqlparser.dialect.mysql.ast.statement.MySqlUpdateStatement; -import com.jd.jdbc.util.SchemaUtil; +import com.jd.jdbc.util.KeyspaceUtil; import java.util.List; @@ -601,9 +601,9 @@ private void visitArgumentList(final List exprList) { private void replaceSchema(final SQLPropertyExpr tableExpr) { String keyspaceInSql = tableExpr.getOwnernName(); if (this.defaultKeyspace.equalsIgnoreCase(keyspaceInSql)) { - this.newDefaultKeyspace = SchemaUtil.getRealSchema(this.defaultKeyspace); + this.newDefaultKeyspace = KeyspaceUtil.getRealSchema(this.defaultKeyspace); } else { - this.newDefaultKeyspace = SchemaUtil.getRealSchema(keyspaceInSql); + this.newDefaultKeyspace = KeyspaceUtil.getRealSchema(keyspaceInSql); } this.isReplace = true; tableExpr.setOwner(this.newDefaultKeyspace); @@ -611,9 +611,9 @@ private void replaceSchema(final SQLPropertyExpr tableExpr) { public String getNewDefaultKeyspace() { if (this.isReplace) { - return SchemaUtil.getLogicSchema(this.newDefaultKeyspace); + return KeyspaceUtil.getLogicSchema(this.newDefaultKeyspace); } - this.newDefaultKeyspace = SchemaUtil.getRealSchema(this.defaultKeyspace); - return SchemaUtil.getLogicSchema(this.newDefaultKeyspace); + this.newDefaultKeyspace = KeyspaceUtil.getRealSchema(this.defaultKeyspace); + return KeyspaceUtil.getLogicSchema(this.newDefaultKeyspace); } } diff --git a/src/main/java/com/jd/jdbc/srvtopo/Gateway.java b/src/main/java/com/jd/jdbc/srvtopo/Gateway.java index 3c37ce3..ce0671f 100644 --- a/src/main/java/com/jd/jdbc/srvtopo/Gateway.java +++ b/src/main/java/com/jd/jdbc/srvtopo/Gateway.java @@ -22,7 +22,6 @@ import com.jd.jdbc.queryservice.IQueryService; import io.vitess.proto.Topodata; import java.util.List; -import java.util.Set; import lombok.Getter; import lombok.Setter; @@ -59,9 +58,9 @@ public abstract class Gateway { * * @param ctx * @param cell - * @param keyspaceNameList + * @param keyspace * @param tabletTypeList * @throws Exception */ - public abstract void waitForTablets(IContext ctx, String cell, Set keyspaceNameList, List tabletTypeList) throws Exception; + public abstract void waitForTablets(IContext ctx, String cell, String keyspace, List tabletTypeList) throws Exception; } diff --git a/src/main/java/com/jd/jdbc/srvtopo/ResilientServer.java b/src/main/java/com/jd/jdbc/srvtopo/ResilientServer.java index 472b907..df2d665 100644 --- a/src/main/java/com/jd/jdbc/srvtopo/ResilientServer.java +++ b/src/main/java/com/jd/jdbc/srvtopo/ResilientServer.java @@ -594,6 +594,10 @@ public void run() { CurrentShard.setShardReferences(keyspace, shardReferencesList); SrvKeyspaceCollector.getSrvKeyspaceTaskCounter().labels(keyspace, cell).inc(); } catch (TopoException e) { + if (TopoException.isErrType(e, TopoExceptionCode.NO_NODE)) { + log.warn("getSrvKeyspace error,cause by" + e.getMessage()); + continue; + } log.error("srvKeyspace-Timer getSrvKeyspace failed for " + cell + "/" + keyspace + ": " + e.getMessage()); SrvKeyspaceCollector.getSrvKeyspaceTaskErrorCounter().labels(keyspace, cell).inc(); } catch (Exception e) { diff --git a/src/main/java/com/jd/jdbc/srvtopo/SrvTopo.java b/src/main/java/com/jd/jdbc/srvtopo/SrvTopo.java index a8a17b8..05e8df6 100644 --- a/src/main/java/com/jd/jdbc/srvtopo/SrvTopo.java +++ b/src/main/java/com/jd/jdbc/srvtopo/SrvTopo.java @@ -18,29 +18,24 @@ package com.jd.jdbc.srvtopo; -import com.jd.jdbc.concurrency.AllErrorRecorder; +import com.jd.jdbc.common.util.CollectionUtils; import com.jd.jdbc.context.IContext; import com.jd.jdbc.key.CurrentShard; import com.jd.jdbc.sqlparser.support.logging.Log; import com.jd.jdbc.sqlparser.support.logging.LogFactory; import com.jd.jdbc.topo.TopoException; +import com.jd.jdbc.topo.TopoExceptionCode; import com.jd.jdbc.topo.TopoServer; -import com.jd.jdbc.util.threadpool.impl.VtDaemonExecutorService; import io.vitess.proto.Query; import io.vitess.proto.Topodata; import java.sql.SQLException; +import java.sql.SQLSyntaxErrorException; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; - -import static com.jd.jdbc.topo.TopoExceptionCode.NO_NODE; public class SrvTopo { private static final Log log = LogFactory.getLog(SrvTopo.class); @@ -86,73 +81,50 @@ public static ResilientServer newResilientServer(TopoServer base, String counter * @return * @throws SrvTopoException */ - public static List findAllTargets(IContext ctx, SrvTopoServer srvTopoServer, String cell, Set keyspaceNameSet, List tabletTypeList) - throws InterruptedException, SQLException { + public static List findAllTargets(IContext ctx, SrvTopoServer srvTopoServer, String cell, String keyspace, List tabletTypeList) throws SQLException { List targetList = new ArrayList<>(); - CountDownLatch countDownLatch = new CountDownLatch(keyspaceNameSet.size()); - ReentrantLock lock = new ReentrantLock(true); - AllErrorRecorder errRecorder = new AllErrorRecorder(); - - for (String keyspace : keyspaceNameSet) { - VtDaemonExecutorService.execute(() -> { - try { - // Get SrvKeyspace for cell/keyspace. - ResilientServer.GetSrvKeyspaceResponse srvKeyspace = srvTopoServer.getSrvKeyspace(ctx, cell, keyspace); - Topodata.SrvKeyspace ks = srvKeyspace.getSrvKeyspace(); - Exception err = srvKeyspace.getException(); - if (err != null) { - if (TopoException.isErrType(err, NO_NODE)) { - // Possibly a race condition, or leftover - // crud in the topology service. Just log it. - log.error("GetSrvKeyspace(" + cell + ", " + keyspace + ") returned ErrNoNode, skipping that SrvKeyspace"); - } else { - // More serious error, abort. - errRecorder.recordError(err); - } - return; - } - - // Get all shard names that are used for serving. - for (Topodata.SrvKeyspace.KeyspacePartition ksPartition : ks.getPartitionsList()) { - // Check we're waiting for tablets of that type. - boolean waitForIt = false; - for (Topodata.TabletType tt : tabletTypeList) { - if (tt.equals(ksPartition.getServedType())) { - waitForIt = true; - break; - } - } - if (!waitForIt) { - continue; - } - - // Add all the shards. Note we can't have - // duplicates, as there is only one entry per - // TabletType in the Partitions list. - lock.lock(); - try { - CurrentShard.setShardReferences(keyspace, ksPartition.getShardReferencesList()); - for (Topodata.ShardReference shard : ksPartition.getShardReferencesList()) { - targetList.add(Query.Target.newBuilder() - .setCell(cell) - .setKeyspace(keyspace) - .setShard(shard.getName()) - .setTabletType(ksPartition.getServedType()) - .build()); - } - } finally { - lock.unlock(); - } - } - } finally { - countDownLatch.countDown(); - } - }); + // Get SrvKeyspace for cell/keyspace. + ResilientServer.GetSrvKeyspaceResponse srvKeyspace = srvTopoServer.getSrvKeyspace(ctx, cell, keyspace); + Topodata.SrvKeyspace ks = srvKeyspace.getSrvKeyspace(); + Exception err = srvKeyspace.getException(); + if (err != null) { + if (TopoException.isErrType(err, TopoExceptionCode.NO_NODE)) { + // Possibly a race condition, or leftover + // crud in the topology service. Just log it. + log.error("GetSrvKeyspace(" + cell + ", " + keyspace + ") returned ErrNoNode, skipping that SrvKeyspace"); + throw new SQLSyntaxErrorException("Unknown database '" + keyspace + "'"); + } else { + // More serious error, abort. + throw new SQLException(err); + } } - countDownLatch.await(10, TimeUnit.SECONDS); - if (errRecorder.hasErrors()) { - throw errRecorder.error(); + if (ks == null || CollectionUtils.isEmpty(ks.getPartitionsList())) { + throw new SQLSyntaxErrorException("Unknown database '" + keyspace + "'"); } + // Get all shard names that are used for serving. + for (Topodata.SrvKeyspace.KeyspacePartition ksPartition : ks.getPartitionsList()) { + // Check we're waiting for tablets of that type. + boolean waitForIt = false; + for (Topodata.TabletType tt : tabletTypeList) { + if (tt.equals(ksPartition.getServedType())) { + waitForIt = true; + break; + } + } + if (!waitForIt) { + continue; + } + CurrentShard.setShardReferences(keyspace, ksPartition.getShardReferencesList()); + for (Topodata.ShardReference shard : ksPartition.getShardReferencesList()) { + targetList.add(Query.Target.newBuilder() + .setCell(cell) + .setKeyspace(keyspace) + .setShard(shard.getName()) + .setTabletType(ksPartition.getServedType()) + .build()); + } + } + return targetList; } } diff --git a/src/main/java/com/jd/jdbc/srvtopo/TabletGateway.java b/src/main/java/com/jd/jdbc/srvtopo/TabletGateway.java index 40e2125..4e54098 100644 --- a/src/main/java/com/jd/jdbc/srvtopo/TabletGateway.java +++ b/src/main/java/com/jd/jdbc/srvtopo/TabletGateway.java @@ -34,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; import lombok.Data; @@ -62,20 +61,12 @@ public class TabletGateway extends Gateway { */ private ReentrantLock lock; - /** - * statusAggregators is a map indexed by the key - * keyspace/shard/tablet_type. - */ - private Map statusAggregators; - - private TabletGateway(SrvTopoServer srvTopoServer, Integer retryCount, - Map statusAggregators) { + private TabletGateway(SrvTopoServer srvTopoServer, Integer retryCount) { this.hc = HealthCheck.INSTANCE; this.srvTopoServer = srvTopoServer; this.retryCount = retryCount; this.tabletGatewayManager = new HashSet<>(16, 1); this.lock = new ReentrantLock(); - this.statusAggregators = statusAggregators; } public static TabletGateway build(SrvTopoServer serv) throws Exception { @@ -84,8 +75,7 @@ public static TabletGateway build(SrvTopoServer serv) throws Exception { return tabletGateways.get(serv); } - Map statusAggregators = new ConcurrentHashMap<>(16, 1); - TabletGateway gw = new TabletGateway(serv, 2, statusAggregators); + TabletGateway gw = new TabletGateway(serv, 2); gw.setQueryService(new RetryTabletQueryService(gw)); tabletGateways.put(serv, gw); @@ -94,22 +84,18 @@ public static TabletGateway build(SrvTopoServer serv) throws Exception { } @Override - public void waitForTablets(IContext ctx, String cell, Set keyspaceNameList, - List tabletTypeList) throws InterruptedException, SQLException { - Map noWatchedMap = new HashMap<>(keyspaceNameList.size()); - for (String keyspaceName : keyspaceNameList) { - for (Topodata.TabletType tabletType : tabletTypeList) { - String cacheKey = String.format("%s.%s.%s", cell, keyspaceName, tabletType); - if (!this.tabletGatewayManager.contains(cacheKey)) { - noWatchedMap.put(cacheKey, keyspaceName); - } + public void waitForTablets(IContext ctx, String cell, String keyspace, List tabletTypeList) throws SQLException { + Map noWatchedMap = new HashMap<>(); + for (Topodata.TabletType tabletType : tabletTypeList) { + String cacheKey = cell + "." + keyspace + "." + tabletType; + if (!this.tabletGatewayManager.contains(cacheKey)) { + noWatchedMap.put(cacheKey, keyspace); } } if (noWatchedMap.isEmpty()) { return; } - List targetList = SrvTopo.findAllTargets(ctx, srvTopoServer, cell, - new HashSet<>(noWatchedMap.values()), tabletTypeList); + List targetList = SrvTopo.findAllTargets(ctx, srvTopoServer, cell, keyspace, tabletTypeList); hc.waitForAllServingTablets(VtContext.withDeadline(ctx, 30, TimeUnit.SECONDS), Lists.newArrayList(targetList)); noWatchedMap.forEach((cacheKey, v) -> this.tabletGatewayManager.add(cacheKey)); } @@ -118,7 +104,4 @@ public void waitForTablets(IContext ctx, String cell, Set keyspaceNameLi public IQueryService queryServiceByAlias(Topodata.TabletAlias alias) { return hc.tabletConnection(alias); } - - public static class TabletStatusAggregator { - } } diff --git a/src/main/java/com/jd/jdbc/topo/TopoServer.java b/src/main/java/com/jd/jdbc/topo/TopoServer.java index 8846211..47a5d16 100644 --- a/src/main/java/com/jd/jdbc/topo/TopoServer.java +++ b/src/main/java/com/jd/jdbc/topo/TopoServer.java @@ -62,34 +62,20 @@ public class TopoServer implements Resource, TopoCellInfo, TopoCellsAliases, Top static final String CELLS_ALIAS_FILE = "CellsAlias"; - static final String KEYSPACE_FILE = "Keyspace"; - - static final String SHARD_FILE = "Shard"; - static final String VSCHEMA_FILE = "VSchema"; - static final String SHARD_REPLICATION_FILE = "ShardReplication"; - static final String TABLET_FILE = "Tablet"; - static final String SRV_VSCHEMA_FILE = "SrvVSchema"; - static final String SRV_KEYSPACE_FILE = "SrvKeyspace"; - static final String ROUTING_RULES_FILE = "RoutingRules"; - static final String CELLS_PATH = "cells"; static final String CELLS_ALIASES_PATH = "cells_aliases"; static final String KEYSPACES_PATH = "keyspaces"; - static final String SHARDS_PATH = "shards"; - static final String TABLETS_PATH = "tablets"; - static final String METADATA_PATH = "metadata"; - private final TopoCellsToAliasesMap cellsAliases; TopoConnection globalCell; @@ -364,20 +350,23 @@ public GetSrvKeyspaceNamesResponse getSrvKeyspaceNames(IContext ctx, String cell * @throws TopoException */ @Override - public List getTabletsByRange(IContext ctx, String cell) throws TopoException { + public List getTabletsByRange(IContext ctx, String cell) throws TopoException { TopoConnection topoConnection = this.connForCell(ctx, cell); List connGetResponseList = topoConnection.getTabletsByCell(ctx, TABLETS_PATH); - List topoTabletInfos = new ArrayList<>(connGetResponseList.size()); + List tablets = new ArrayList<>(connGetResponseList.size()); Topodata.Tablet tablet; try { for (ConnGetResponse connGetResponse : connGetResponseList) { tablet = Topodata.Tablet.parseFrom(connGetResponse.getContents()); - topoTabletInfos.add(new TopoTabletInfo(connGetResponse.getVersion(), tablet)); + if (tablet == null) { + continue; + } + tablets.add(tablet); } } catch (InvalidProtocolBufferException e) { throw TopoException.wrap(e.getMessage()); } - return topoTabletInfos; + return tablets; } /** @@ -387,7 +376,7 @@ public List getTabletsByRange(IContext ctx, String cell) throws * @throws TopoException */ @Override - public TopoTabletInfo getTablet(IContext ctx, Topodata.TabletAlias tabletAlias) throws TopoException { + public Topodata.Tablet getTablet(IContext ctx, Topodata.TabletAlias tabletAlias) throws TopoException { TopoConnection topoConnection = this.connForCell(ctx, tabletAlias.getCell()); String tabletPath = pathForTabletAlias(TopoProto.tabletAliasString(tabletAlias)); @@ -395,24 +384,24 @@ public TopoTabletInfo getTablet(IContext ctx, Topodata.TabletAlias tabletAlias) Topodata.Tablet tablet; try { tablet = Topodata.Tablet.parseFrom(connGetResponse.getContents()); + return tablet; } catch (InvalidProtocolBufferException e) { throw TopoException.wrap(e.getMessage()); } - return new TopoTabletInfo(connGetResponse.getVersion(), tablet); } @Override - public CompletableFuture getTabletFuture(IContext ctx, Topodata.TabletAlias tabletAlias) throws TopoException { + public CompletableFuture getTabletFuture(IContext ctx, Topodata.TabletAlias tabletAlias) throws TopoException { TopoConnection topoConnection = this.connForCell(ctx, tabletAlias.getCell()); String tabletPath = pathForTabletAlias(TopoProto.tabletAliasString(tabletAlias)); return topoConnection.getFuture(ctx, tabletPath).thenApply(connGetResponse -> { Topodata.Tablet tablet; try { tablet = Topodata.Tablet.parseFrom(connGetResponse.getContents()); + return tablet; } catch (InvalidProtocolBufferException e) { throw new CompletionException(TopoException.wrap(e.getMessage())); } - return new TopoTabletInfo(connGetResponse.getVersion(), tablet); }); } diff --git a/src/main/java/com/jd/jdbc/topo/TopoTablet.java b/src/main/java/com/jd/jdbc/topo/TopoTablet.java index d5db277..de02c78 100644 --- a/src/main/java/com/jd/jdbc/topo/TopoTablet.java +++ b/src/main/java/com/jd/jdbc/topo/TopoTablet.java @@ -31,11 +31,11 @@ public interface TopoTablet { * @return * @throws TopoException */ - TopoTabletInfo getTablet(IContext ctx, Topodata.TabletAlias tabletAlias) throws TopoException; + Topodata.Tablet getTablet(IContext ctx, Topodata.TabletAlias tabletAlias) throws TopoException; - CompletableFuture getTabletFuture(IContext ctx, Topodata.TabletAlias tabletAlias) throws TopoException; + List getTabletsByRange(IContext ctx, String cell) throws TopoException; - List getTabletsByRange(IContext ctx, String cell) throws TopoException; + CompletableFuture getTabletFuture(IContext ctx, Topodata.TabletAlias tabletAlias) throws TopoException; /** * @param ctx diff --git a/src/main/java/com/jd/jdbc/topo/TopoTabletInfo.java b/src/main/java/com/jd/jdbc/topo/TopoTabletInfo.java deleted file mode 100644 index b3f7f68..0000000 --- a/src/main/java/com/jd/jdbc/topo/TopoTabletInfo.java +++ /dev/null @@ -1,57 +0,0 @@ -/* -Copyright 2021 JD Project Authors. Licensed under Apache-2.0. - -Copyright 2019 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package com.jd.jdbc.topo; - -import com.jd.jdbc.topo.topoproto.TopoProto; -import io.vitess.proto.Topodata; - -public class TopoTabletInfo implements TopoConnection.Version { - private TopoConnection.Version version; - - private Topodata.Tablet tablet; - - public TopoTabletInfo(TopoConnection.Version version, Topodata.Tablet tablet) { - this.version = version; - this.tablet = tablet; - } - - /** - * @return - */ - @Override - public String string() { - return String.format("Tablet{%s}", TopoProto.tabletAliasString(this.tablet.getAlias())); - } - - public TopoConnection.Version getVersion() { - return version; - } - - public void setVersion(TopoConnection.Version version) { - this.version = version; - } - - public Topodata.Tablet getTablet() { - return tablet; - } - - public void setTablet(Topodata.Tablet tablet) { - this.tablet = tablet; - } -} diff --git a/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java b/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java index f88dded..0827082 100644 --- a/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java +++ b/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java @@ -227,7 +227,6 @@ public ConnGetResponse get(IContext ctx, String filePath, boolean ignoreNoNode) } ConnGetResponse connGetResponse = new ConnGetResponse(); connGetResponse.setContents(response.getKvs().get(0).getValue().getBytes()); - connGetResponse.setVersion(new Etcd2Version(response.getKvs().get(0).getModRevision())); return connGetResponse; } @@ -254,7 +253,6 @@ public List getTabletsByCell(IContext ctx, String filePath) thr for (KeyValue kv : response.getKvs()) { connGetResponse = new ConnGetResponse(); connGetResponse.setContents(kv.getValue().getBytes()); - connGetResponse.setVersion(new Etcd2Version(kv.getModRevision())); connGetResponseList.add(connGetResponse); } return connGetResponseList; @@ -271,7 +269,6 @@ public CompletableFuture getFuture(IContext ctx, String filePat } ConnGetResponse connGetResponse = new ConnGetResponse(); connGetResponse.setContents(response.getKvs().get(0).getValue().getBytes()); - connGetResponse.setVersion(new Etcd2Version(response.getKvs().get(0).getModRevision())); return connGetResponse; }); } diff --git a/src/main/java/com/jd/jdbc/util/SchemaUtil.java b/src/main/java/com/jd/jdbc/util/KeyspaceUtil.java similarity index 97% rename from src/main/java/com/jd/jdbc/util/SchemaUtil.java rename to src/main/java/com/jd/jdbc/util/KeyspaceUtil.java index 1f9c960..9bf097c 100644 --- a/src/main/java/com/jd/jdbc/util/SchemaUtil.java +++ b/src/main/java/com/jd/jdbc/util/KeyspaceUtil.java @@ -16,11 +16,10 @@ package com.jd.jdbc.util; -import io.netty.util.internal.StringUtil; - import static com.jd.jdbc.common.Constant.DEFAULT_DATABASE_PREFIX; +import io.netty.util.internal.StringUtil; -public class SchemaUtil { +public class KeyspaceUtil { public static String getLogicSchema(String tableCat) { if (StringUtil.isNullOrEmpty(tableCat)) { return tableCat; @@ -38,4 +37,4 @@ public static String getRealSchema(String tableCat) { } return DEFAULT_DATABASE_PREFIX + tableCat; } -} +} \ No newline at end of file diff --git a/src/main/java/com/jd/jdbc/vitess/VitessConnection.java b/src/main/java/com/jd/jdbc/vitess/VitessConnection.java index 3640731..c898dc2 100755 --- a/src/main/java/com/jd/jdbc/vitess/VitessConnection.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessConnection.java @@ -36,7 +36,7 @@ import com.jd.jdbc.sqlparser.utils.Utils; import com.jd.jdbc.srvtopo.Resolver; import com.jd.jdbc.topo.TopoServer; -import com.jd.jdbc.util.SchemaUtil; +import com.jd.jdbc.util.KeyspaceUtil; import com.jd.jdbc.util.TimeUtil; import com.jd.jdbc.vitess.metadata.CachedDatabaseMetaData; import com.jd.jdbc.vitess.metadata.VitessDatabaseMetaData; @@ -56,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.Properties; -import java.util.Set; import java.util.TimeZone; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -99,13 +98,10 @@ public class VitessConnection extends AbstractVitessConnection { @Setter private Vtgate.Session session; - @Getter - private Set ksSet; - @Getter private String defaultKeyspace; - public VitessConnection(String url, Properties prop, TopoServer topoServer, Resolver resolver, Set ksSet, VSchemaManager vSchemaManager, String defaultKeyspace) throws SQLException { + public VitessConnection(String url, Properties prop, TopoServer topoServer, Resolver resolver, VSchemaManager vSchemaManager, String defaultKeyspace) throws SQLException { this.isClosed = false; this.url = url; this.properties = prop; @@ -119,7 +115,6 @@ public VitessConnection(String url, Properties prop, TopoServer topoServer, Reso this.ctx.setContextValue(ContextKey.CTX_TX_CONN, resolver.getScatterConn().getTxConn()); this.ctx.setContextValue(ContextKey.CTX_VSCHEMA_MANAGER, this.vm); - this.ksSet = ksSet; this.defaultKeyspace = defaultKeyspace; this.session = Vtgate.Session.newBuilder() @@ -446,7 +441,7 @@ public CachedDatabaseMetaData getCachedDatabaseMetaData() throws SQLException { } public void closeInnerConnection() { - List tabletList = HealthCheck.INSTANCE.getHealthyTablets(SchemaUtil.getLogicSchema(defaultKeyspace)); + List tabletList = HealthCheck.INSTANCE.getHealthyTablets(KeyspaceUtil.getLogicSchema(defaultKeyspace)); for (Topodata.Tablet tablet : tabletList) { IParentQueryService queryService = TabletDialer.dial(tablet); queryService.closeNativeQueryService(); diff --git a/src/main/java/com/jd/jdbc/vitess/VitessDriver.java b/src/main/java/com/jd/jdbc/vitess/VitessDriver.java index da56518..50f01f7 100755 --- a/src/main/java/com/jd/jdbc/vitess/VitessDriver.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessDriver.java @@ -54,10 +54,8 @@ import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashSet; import java.util.List; import java.util.Properties; -import java.util.Set; import java.util.concurrent.locks.ReentrantLock; import java.util.logging.Logger; @@ -96,9 +94,8 @@ public Connection initConnect(String url, Properties info, boolean initOnly) thr } try { - List keySpaces = Arrays.asList(prop.getProperty(Constant.DRIVER_PROPERTY_SCHEMA).split(",")); SecurityCenter.INSTANCE.addCredential(prop); - String defaultKeyspace = keySpaces.get(0); + String defaultKeyspace = prop.getProperty(Constant.DRIVER_PROPERTY_SCHEMA); String role = prop.getProperty(Constant.DRIVER_PROPERTY_ROLE_KEY, Constant.DRIVER_PROPERTY_ROLE_RW); if (!prop.containsKey(Constant.DRIVER_PROPERTY_ROLE_KEY)) { @@ -109,22 +106,21 @@ public Connection initConnect(String url, Properties info, boolean initOnly) thr List cells = Arrays.asList(prop.getProperty("cell").split(",")); String localCell = cells.get(0); - Set ksSet = new HashSet<>(keySpaces); for (String cell : cells) { - TopologyWatcherManager.INSTANCE.startWatch(globalContext, topoServer, cell, ksSet); + TopologyWatcherManager.INSTANCE.startWatch(globalContext, topoServer, cell, defaultKeyspace); } TabletGateway tabletGateway = TabletGateway.build(resilientServer); for (String cell : cells) { - TopologyWatcherManager.INSTANCE.watch(globalContext, cell, ksSet); + TopologyWatcherManager.INSTANCE.watch(globalContext, cell, defaultKeyspace); } boolean masterFlag = role.equalsIgnoreCase(Constant.DRIVER_PROPERTY_ROLE_RW); List tabletTypes = masterFlag ? Lists.newArrayList(Topodata.TabletType.MASTER) : Lists.newArrayList(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY); - tabletGateway.waitForTablets(globalContext, localCell, ksSet, tabletTypes); + tabletGateway.waitForTablets(globalContext, localCell, defaultKeyspace, tabletTypes); TxConn txConn = new TxConn(tabletGateway, Vtgate.TransactionMode.MULTI); ScatterConn scatterConn = ScatterConn.newScatterConn("VttabletCall", txConn, tabletGateway); @@ -143,7 +139,7 @@ public Connection initConnect(String url, Properties info, boolean initOnly) thr MonitorServer.getInstance().register(defaultKeyspace); VSchemaManager vSchemaManager = VSchemaManager.getInstance(topoServer); - vSchemaManager.initVschema(ksSet); + vSchemaManager.initVschema(defaultKeyspace); VtApiServer apiServer = VtApiServer.getInstance(); if (apiServer != null) { apiServer.register(defaultKeyspace, vSchemaManager); @@ -152,7 +148,7 @@ public Connection initConnect(String url, Properties info, boolean initOnly) thr log.warn("cannot get apiServer, register:" + defaultKeyspace + "skipped."); } } - return new VitessConnection(url, prop, topoServer, resolver, ksSet, vSchemaManager, defaultKeyspace); + return new VitessConnection(url, prop, topoServer, resolver, vSchemaManager, defaultKeyspace); } catch (Exception e) { throw new SQLException(e); } diff --git a/src/main/java/com/jd/jdbc/vitess/metadata/VitessDatabaseMetaData.java b/src/main/java/com/jd/jdbc/vitess/metadata/VitessDatabaseMetaData.java index 2918657..c328098 100644 --- a/src/main/java/com/jd/jdbc/vitess/metadata/VitessDatabaseMetaData.java +++ b/src/main/java/com/jd/jdbc/vitess/metadata/VitessDatabaseMetaData.java @@ -19,7 +19,7 @@ import com.jd.jdbc.pool.InnerConnection; import com.jd.jdbc.pool.StatefulConnectionPool; import com.jd.jdbc.queryservice.util.RoleUtils; -import com.jd.jdbc.util.SchemaUtil; +import com.jd.jdbc.util.KeyspaceUtil; import com.jd.jdbc.vitess.VitessConnection; import com.jd.jdbc.vitess.resultset.DatabaseMetaDataResultSet; import java.io.IOException; @@ -332,10 +332,10 @@ private DatabaseMetaData getDatabaseMetaData(InnerConnection innerConnection) th } private String getCatalog(String catalog) { - return SchemaUtil.getRealSchema(catalog); + return KeyspaceUtil.getRealSchema(catalog); } private String getSchema(String schema) { - return SchemaUtil.getRealSchema(schema); + return KeyspaceUtil.getRealSchema(schema); } } \ No newline at end of file diff --git a/src/main/java/com/jd/jdbc/vitess/resultset/DatabaseMetaDataResultSet.java b/src/main/java/com/jd/jdbc/vitess/resultset/DatabaseMetaDataResultSet.java index bfd13b4..1dc5e13 100644 --- a/src/main/java/com/jd/jdbc/vitess/resultset/DatabaseMetaDataResultSet.java +++ b/src/main/java/com/jd/jdbc/vitess/resultset/DatabaseMetaDataResultSet.java @@ -17,7 +17,7 @@ package com.jd.jdbc.vitess.resultset; -import com.jd.jdbc.util.SchemaUtil; +import com.jd.jdbc.util.KeyspaceUtil; import java.math.BigDecimal; import java.net.URL; import java.sql.Date; @@ -89,7 +89,7 @@ private DatabaseMetaDataObject generateDatabaseMetaDataObject(final int tableCat for (int i = 1; i <= indexMap.size(); i++) { if (tableCatIndex == i) { String tableCat = resultSet.getString(i); - tableCat = SchemaUtil.getLogicSchema(tableCat); + tableCat = KeyspaceUtil.getLogicSchema(tableCat); if (tableCat.equals("_vt")) { return null; } diff --git a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java index f6ad2b2..3d272ff 100644 --- a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java +++ b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java @@ -18,7 +18,6 @@ package com.jd.jdbc.discovery; -import com.google.common.collect.Sets; import com.jd.jdbc.context.IContext; import com.jd.jdbc.context.VtContext; import com.jd.jdbc.discovery.TabletHealthCheck.TabletStreamHealthStatus; @@ -766,11 +765,10 @@ public void testUnhealthyReplicaAsSecondsBehind() throws IOException, Interrupte printOk(); } - private void startWatchTopo(String keyspaceName, TopoServer topoServer, String... cells) throws TopoException { + private void startWatchTopo(String keyspaceName, TopoServer topoServer, String... cells) { for (String cell : cells) { - TopologyWatcherManager.INSTANCE.startWatch(globalContext, topoServer, cell, Sets.newHashSet(keyspaceName)); + TopologyWatcherManager.INSTANCE.startWatch(globalContext, topoServer, cell, keyspaceName); } - } private void closeQueryService(MockTablet... tablets) throws InterruptedException { @@ -782,10 +780,8 @@ private void closeQueryService(MockTablet... tablets) throws InterruptedExceptio private HealthCheck getHealthCheck() { HealthCheck hc = HealthCheck.INSTANCE; - Assert.assertEquals(0, hc.getHealthByAliasCopy().size()); Assert.assertEquals(0, hc.getHealthyCopy().size()); - return hc; } diff --git a/src/test/java/com/jd/jdbc/key/BinaryHashUtilTest.java b/src/test/java/com/jd/jdbc/key/BinaryHashUtilTest.java index 8a3e0eb..452aeda 100644 --- a/src/test/java/com/jd/jdbc/key/BinaryHashUtilTest.java +++ b/src/test/java/com/jd/jdbc/key/BinaryHashUtilTest.java @@ -16,15 +16,25 @@ package com.jd.jdbc.key; -import org.junit.Rule; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; import org.junit.Test; -import org.junit.rules.ExpectedException; +import testsuite.TestSuite; +import testsuite.internal.TestSuiteShardSpec; -public class BinaryHashUtilTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); +public class BinaryHashUtilTest extends TestSuite { @Test - public void getShardByVindex() { + public void getShardByVindex() throws SQLException { + String hashvalue = BinaryHashUtil.getShardByVindex(ShardEnum.TWO_SHARDS, "hashvalue"); + System.out.println(hashvalue);// -80 + } + + @Test + public void getShardByVindexByKeyspace() throws SQLException { + Connection connection = DriverManager.getConnection(getConnectionUrl(Driver.of(TestSuiteShardSpec.TWO_SHARDS))); + String hashvalue = BinaryHashUtil.getShardByVindex(connection.getCatalog(), "hashvalue"); + System.out.println(hashvalue);// -80 } } \ No newline at end of file diff --git a/src/test/java/com/jd/jdbc/sqlparser/ChangeSchemaTest.java b/src/test/java/com/jd/jdbc/sqlparser/ChangeSchemaTest.java index e19db74..c8481ab 100644 --- a/src/test/java/com/jd/jdbc/sqlparser/ChangeSchemaTest.java +++ b/src/test/java/com/jd/jdbc/sqlparser/ChangeSchemaTest.java @@ -18,7 +18,7 @@ import com.jd.jdbc.sqlparser.ast.SQLStatement; import com.jd.jdbc.sqlparser.dialect.mysql.visitor.VtChangeSchemaVisitor; -import com.jd.jdbc.util.SchemaUtil; +import com.jd.jdbc.util.KeyspaceUtil; import com.jd.jdbc.vitess.VitessConnection; import com.jd.jdbc.vitess.VitessStatement; import io.netty.util.internal.StringUtil; @@ -214,7 +214,7 @@ public void case10_ParseStatements() throws SQLException { try (VitessStatement vitessStatement = (VitessStatement) vitessConnection.createStatement()) { VitessStatement.ParseResult parseResult = vitessStatement.parseStatements("select * from " + defaultKeyspace + ".t"); Assert.assertEquals(defaultKeyspace, parseResult.getSchema()); - Assert.assertEquals("select * from " + SchemaUtil.getRealSchema(defaultKeyspace) + ".t", SQLUtils.toMySqlString(parseResult.getStatement(), SQLUtils.NOT_FORMAT_OPTION)); + Assert.assertEquals("select * from " + KeyspaceUtil.getRealSchema(defaultKeyspace) + ".t", SQLUtils.toMySqlString(parseResult.getStatement(), SQLUtils.NOT_FORMAT_OPTION)); } // different database expr no watch @@ -224,13 +224,6 @@ public void case10_ParseStatements() throws SQLException { Assert.assertEquals("unexpected keyspace (ks) in sql: select * from ks.tb", e.getMessage()); } - // different database expr in SQL - replace different database expr, defaultKeyspace is 'vt_' + different database expr - try (VitessStatement vitessStatement = (VitessStatement) vitessConnection.createStatement()) { - vitessConnection.getKsSet().add("ks"); - VitessStatement.ParseResult parseResult = vitessStatement.parseStatements("select * from ks.tb"); - Assert.assertEquals("ks", parseResult.getSchema()); - Assert.assertEquals("select * from " + SchemaUtil.getRealSchema("ks") + ".tb", SQLUtils.toMySqlString(parseResult.getStatement(), SQLUtils.NOT_FORMAT_OPTION)); - } clearAll(); } diff --git a/src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java b/src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java index 5f638a6..6d3b4ff 100644 --- a/src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java +++ b/src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java @@ -26,7 +26,6 @@ import com.jd.jdbc.topo.TopoException; import com.jd.jdbc.topo.TopoExceptionCode; import com.jd.jdbc.topo.TopoServer; -import com.jd.jdbc.topo.TopoTabletInfo; import com.jd.jdbc.util.threadpool.impl.VtDaemonExecutorService; import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; @@ -40,7 +39,6 @@ import java.time.Duration; import java.util.ArrayList; import java.util.Date; -import java.util.HashSet; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; @@ -226,9 +224,7 @@ public void case07_testFindAllTargets() throws Exception { ResilientServer resilientServer = SrvTopo.newResilientServer(topoServer, ""); VtDaemonExecutorService.initialize(null, null, null); - List targetList = SrvTopo.findAllTargets(VtContext.withCancel(VtContext.background()), resilientServer, TOPO_CELL, new HashSet() {{ - add(TOPO_KEYSPACE); - }}, new ArrayList() {{ + List targetList = SrvTopo.findAllTargets(VtContext.withCancel(VtContext.background()), resilientServer, TOPO_CELL, TOPO_KEYSPACE, new ArrayList() {{ add(Topodata.TabletType.MASTER); add(Topodata.TabletType.REPLICA); add(Topodata.TabletType.RDONLY); @@ -246,8 +242,8 @@ public void case08_testTimer() throws Exception { private void commonTestServer(TopoServer topoServer) throws TopoException { List tabletAliasList = topoServer.getTabletAliasByCell(VtContext.withCancel(VtContext.background()), TOPO_CELL); for (Topodata.TabletAlias tabletAlias : tabletAliasList) { - TopoTabletInfo topoTabletInfo = topoServer.getTablet(VtContext.withCancel(VtContext.background()), tabletAlias); - Assert.assertNotNull(topoTabletInfo); + Topodata.Tablet tablet = topoServer.getTablet(VtContext.withCancel(VtContext.background()), tabletAlias); + Assert.assertNotNull(tablet); } Topodata.SrvKeyspace srvKeyspace = topoServer.getSrvKeyspace(VtContext.withCancel(VtContext.background()), TOPO_CELL, TOPO_KEYSPACE); diff --git a/src/test/java/com/jd/jdbc/util/InnerConnectionPoolUtil.java b/src/test/java/com/jd/jdbc/util/InnerConnectionPoolUtil.java index 22950de..bffcbf2 100644 --- a/src/test/java/com/jd/jdbc/util/InnerConnectionPoolUtil.java +++ b/src/test/java/com/jd/jdbc/util/InnerConnectionPoolUtil.java @@ -51,7 +51,7 @@ public static void clearAll() throws NoSuchFieldException, IllegalAccessExceptio } private static void clearByKeyspace(String keyspace) { - List tabletList = HealthCheck.INSTANCE.getHealthyTablets(SchemaUtil.getLogicSchema(keyspace)); + List tabletList = HealthCheck.INSTANCE.getHealthyTablets(KeyspaceUtil.getLogicSchema(keyspace)); for (Topodata.Tablet tablet : tabletList) { IParentQueryService queryService = TabletDialer.dial(tablet); queryService.closeNativeQueryService(); diff --git a/src/test/java/testsuite/TestSuite.java b/src/test/java/testsuite/TestSuite.java index 260b375..70ad7f2 100644 --- a/src/test/java/testsuite/TestSuite.java +++ b/src/test/java/testsuite/TestSuite.java @@ -57,10 +57,23 @@ protected static Connection getConnection(TestSuiteEnv env) throws SQLException return env.getDevConnection(); } + public static void closeConnection(Connection... conns) { + for (Connection conn : conns) { + if (conn != null) { + try { + conn.close(); + } catch (SQLException throwables) { + printFail(throwables.getMessage()); + throw new RuntimeException(throwables); + } + } + } + } + protected static ExecutorService getThreadPool(int num, int max) { ExecutorService pool = new ThreadPoolExecutor(num, max, 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue(), + new LinkedBlockingQueue<>(), new ThreadFactory() { private final AtomicInteger threadNumber = new AtomicInteger(1); @@ -124,9 +137,9 @@ protected void replaceKeyspace(List caseList, String ke } } - public void sleep(long millsec) { + public void sleep(long second) { try { - TimeUnit.SECONDS.sleep(millsec); + TimeUnit.SECONDS.sleep(second); } catch (InterruptedException e) { throw new RuntimeException(e); } diff --git a/src/test/java/testsuite/internal/environment/DriverEnv.java b/src/test/java/testsuite/internal/environment/DriverEnv.java index 721d1a0..686988b 100644 --- a/src/test/java/testsuite/internal/environment/DriverEnv.java +++ b/src/test/java/testsuite/internal/environment/DriverEnv.java @@ -20,9 +20,8 @@ import java.sql.SQLException; import testsuite.internal.TestSuiteShardSpec; import testsuite.internal.config.DriverJdbcCfg; -import testsuite.internal.config.TestSuiteCfgReader; - import static testsuite.internal.config.TestSuiteCfgPath.DEV; +import testsuite.internal.config.TestSuiteCfgReader; public class DriverEnv extends TestSuiteEnv { diff --git a/src/test/java/testsuite/internal/environment/TestSuiteEnv.java b/src/test/java/testsuite/internal/environment/TestSuiteEnv.java index 33e74ec..c59d06d 100644 --- a/src/test/java/testsuite/internal/environment/TestSuiteEnv.java +++ b/src/test/java/testsuite/internal/environment/TestSuiteEnv.java @@ -35,11 +35,6 @@ public TestSuiteEnv(TestSuiteShardSpec shardSpec) { @Override public Connection getConnection(String url) throws SQLException { - try { - return DriverManager.getConnection(url); - } catch (SQLException e) { - e.printStackTrace(); - } - return null; + return DriverManager.getConnection(url); } }