diff --git a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java index b645c6f..7c70eb0 100644 --- a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java +++ b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java @@ -344,7 +344,7 @@ public void removeTablet(Topodata.Tablet tablet) { try { thc = this.healthByAlias.get(tabletAlias); if (thc == null) { - log.error("we have no health data for tablet " + TopoProto.tabletToHumanString(tablet) + ", it might have been delete already"); + log.info("we have no health data for tablet " + TopoProto.tabletToHumanString(tablet) + ", it might have been delete already"); return; } diff --git a/src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java b/src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java index 70e5200..7a717df 100644 --- a/src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java +++ b/src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java @@ -78,6 +78,15 @@ public TopologyWatcher(TopoServer ts, String cell, String tabletKeyspace) { log.info("start topo watcher for cell: " + cell); } + public TopologyWatcher(TopoServer ts, String cell, Set tabletKeyspaces) { + this.ts = ts; + this.hc = HealthCheck.INSTANCE; + this.cell = cell; + this.ksSet.addAll(tabletKeyspaces); + this.firstLoadTabletsFlag = true; + log.info("start topo watcher for cell: " + cell); + } + private void loadTablets(IContext ctx) { try { Map newTablets = getTopoTabletInfoMap(ctx); @@ -85,19 +94,24 @@ private void loadTablets(IContext ctx) { TopologyCollector.getCounter().labels(cell).inc(); } catch (Throwable t) { TopologyCollector.getErrorCounter().labels(cell).inc(); - log.error("Unexpected error occur at loadTablets, cause: " + t.getMessage(), t); + log.error("Unexpected Throwable error occur at loadTablets, cause: " + t.getMessage(), t); } } private void connectTablets(Map newTablets) { - if (CollectionUtils.isEmpty(newTablets)) { - for (Map.Entry entry : currentTablets.entrySet()) { - hc.removeTablet(entry.getValue()); - } - return; - } this.lock.lock(); try { + if (CollectionUtils.isEmpty(newTablets)) { + if (currentTablets == null || currentTablets.isEmpty()) { + return; + } + for (Map.Entry entry : currentTablets.entrySet()) { + hc.removeTablet(entry.getValue()); + } + currentTablets = new ConcurrentHashMap<>(); + return; + } + for (Map.Entry entry : newTablets.entrySet()) { Topodata.Tablet newTablet = entry.getValue(); Topodata.Tablet oldTablet = currentTablets.get(entry.getKey()); @@ -208,8 +222,19 @@ public void watchKeyspace(IContext ctx, String tabletKeyspace) { if (ksSet.contains(tabletKeyspace)) { return; } + + this.lock.lock(); + try { + if (ksSet.contains(tabletKeyspace)) { + return; + } + this.ksSet.add(tabletKeyspace); + } finally { + this.lock.unlock(); + } + log.info("topo watcher for cell " + cell + " watches: " + tabletKeyspace); - this.ksSet.add(tabletKeyspace); + Set tablets = this.ignoreTopo.watchKs(tabletKeyspace); if (CollectionUtils.isEmpty(tablets)) { return; @@ -229,6 +254,9 @@ public void watchKeyspace(IContext ctx, String tabletKeyspace) { public void close() { timer.cancel(); + if (currentTablets == null) { + return; + } for (Map.Entry entry : currentTablets.entrySet()) { hc.removeTablet(entry.getValue()); } diff --git a/src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java b/src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java index bcff6d5..d756419 100644 --- a/src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java +++ b/src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java @@ -18,10 +18,22 @@ package com.jd.jdbc.discovery; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.jd.jdbc.common.util.CollectionUtils; import com.jd.jdbc.context.IContext; +import com.jd.jdbc.sqlparser.support.logging.Log; +import com.jd.jdbc.sqlparser.support.logging.LogFactory; +import com.jd.jdbc.topo.TopoException; import com.jd.jdbc.topo.TopoServer; +import com.jd.jdbc.util.threadpool.VtThreadFactoryBuilder; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -31,17 +43,36 @@ public enum TopologyWatcherManager { */ INSTANCE; + private Map cellTopologyWatcherMap = null; + + private Map> globalKeyspacesMap = null; + private final Lock lock = new ReentrantLock(); - private Map cellTopologyWatcherMap = null; + private static final Log LOGGER = LogFactory.getLog(TopologyWatcherManager.class); + + private ScheduledThreadPoolExecutor scheduledExecutor; TopologyWatcherManager() { cellTopologyWatcherMap = new ConcurrentHashMap<>(16); + globalKeyspacesMap = new ConcurrentHashMap<>(16); + + scheduledExecutor = new ScheduledThreadPoolExecutor(1, VtThreadFactoryBuilder.build("reload-cell-schedule")); + scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); + scheduledExecutor.setRemoveOnCancelPolicy(true); } public void startWatch(IContext ctx, TopoServer topoServer, String cell, String tabletKeyspace) { lock.lock(); try { + String serverAddress = topoServer.getServerAddress(); + if (!globalKeyspacesMap.containsKey(serverAddress)) { + globalKeyspacesMap.put(serverAddress, new HashSet<>()); + + startTickerReloadCell(ctx, topoServer, TimeUnit.MINUTES); + } + globalKeyspacesMap.get(serverAddress).add(tabletKeyspace); + if (!cellTopologyWatcherMap.containsKey(cell)) { TopologyWatcher topologyWatcher = new TopologyWatcher(topoServer, cell, tabletKeyspace); topologyWatcher.start(ctx); @@ -60,10 +91,67 @@ public void watch(IContext ctx, String cell, String tabletKeyspace) { } public void close() { + closeScheduledExecutor(); + for (Map.Entry entry : cellTopologyWatcherMap.entrySet()) { TopologyWatcher topologyWatcher = entry.getValue(); topologyWatcher.close(); } cellTopologyWatcherMap.clear(); + globalKeyspacesMap.clear(); + } + + public void resetScheduledExecutor() { + closeScheduledExecutor(); + + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("reload-cell-schedule").setDaemon(true).build(); + scheduledExecutor = new ScheduledThreadPoolExecutor(1, threadFactory); + } + + public void closeScheduledExecutor() { + scheduledExecutor.shutdownNow(); + try { + int tryAgain = 3; + while (tryAgain > 0 && !scheduledExecutor.awaitTermination(1, TimeUnit.SECONDS)) { + tryAgain--; + } + } catch (InterruptedException e) { + // We're shutting down anyway, so just ignore. + } + } + + public boolean isWatching(String cell) { + return cellTopologyWatcherMap.containsKey(cell); + } + + public void startTickerReloadCell(IContext globalContext, TopoServer topoServer, TimeUnit timeUnit) { + scheduledExecutor.scheduleWithFixedDelay(() -> { + try { + tickerUpdateCells(globalContext, topoServer); + } catch (Throwable e) { + LOGGER.error("tickerUpdateCells error: " + e); + } + }, 5, 10, timeUnit); + } + + private void tickerUpdateCells(IContext globalContext, TopoServer topoServer) throws TopoException { + String serverAddress = topoServer.getServerAddress(); + Set keyspaceSet = globalKeyspacesMap.get(serverAddress); + if (CollectionUtils.isEmpty(keyspaceSet)) { + throw new RuntimeException("not found keyspace in " + serverAddress + " of TopologyWatcherManager.globalKeyspacesMap ."); + } + List allCells = topoServer.getAllCells(globalContext); + for (String cell : allCells) { + if (!isWatching(cell)) { + lock.lock(); + try { + TopologyWatcher topologyWatcher = new TopologyWatcher(topoServer, cell, keyspaceSet); + topologyWatcher.start(globalContext); + cellTopologyWatcherMap.put(cell, topologyWatcher); + } finally { + lock.unlock(); + } + } + } } } diff --git a/src/main/java/com/jd/jdbc/topo/Topo.java b/src/main/java/com/jd/jdbc/topo/Topo.java index 2406b88..b3057e5 100644 --- a/src/main/java/com/jd/jdbc/topo/Topo.java +++ b/src/main/java/com/jd/jdbc/topo/Topo.java @@ -18,6 +18,13 @@ package com.jd.jdbc.topo; +import com.jd.jdbc.topo.etcd2topo.Etcd2TopoFactory; +import io.vitess.proto.Vtrpc; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import static com.jd.jdbc.topo.TopoExceptionCode.NO_IMPLEMENTATION; import static com.jd.jdbc.topo.TopoServer.CELLS_PATH; import static com.jd.jdbc.topo.TopoServer.CELL_INFO_FILE; @@ -28,12 +35,6 @@ import static com.jd.jdbc.topo.TopoServer.TABLETS_PATH; import static com.jd.jdbc.topo.TopoServer.TABLET_FILE; import static com.jd.jdbc.topo.TopoServer.VSCHEMA_FILE; -import com.jd.jdbc.topo.etcd2topo.Etcd2TopoFactory; -import io.vitess.proto.Vtrpc; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; public class Topo { @@ -125,6 +126,7 @@ protected static TopoServer newWithFactory(TopoFactory topoFactory, String serve topoServer.globalReadOnlyCell = connReadOnly; topoServer.topoFactory = topoFactory; topoServer.cells = new HashMap<>(16); + topoServer.serverAddress = serverAddress; return topoServer; } diff --git a/src/main/java/com/jd/jdbc/topo/TopoServer.java b/src/main/java/com/jd/jdbc/topo/TopoServer.java index 7291acb..05c607d 100644 --- a/src/main/java/com/jd/jdbc/topo/TopoServer.java +++ b/src/main/java/com/jd/jdbc/topo/TopoServer.java @@ -21,13 +21,9 @@ import com.google.protobuf.InvalidProtocolBufferException; import com.jd.jdbc.context.IContext; import com.jd.jdbc.key.CurrentShard; -import static com.jd.jdbc.topo.Topo.pathForCellInfo; -import static com.jd.jdbc.topo.Topo.pathForSrvKeyspaceFile; -import static com.jd.jdbc.topo.Topo.pathForTabletAlias; -import static com.jd.jdbc.topo.Topo.pathForVschemaFile; -import static com.jd.jdbc.topo.TopoConnection.ConnGetResponse; +import com.jd.jdbc.sqlparser.support.logging.Log; +import com.jd.jdbc.sqlparser.support.logging.LogFactory; import com.jd.jdbc.topo.TopoConnection.DirEntry; -import static com.jd.jdbc.topo.TopoExceptionCode.NO_NODE; import com.jd.jdbc.topo.topoproto.TopoProto; import io.vitess.proto.Topodata; import java.util.ArrayList; @@ -41,8 +37,17 @@ import java.util.concurrent.locks.ReentrantLock; import vschema.Vschema; +import static com.jd.jdbc.topo.Topo.pathForCellInfo; +import static com.jd.jdbc.topo.Topo.pathForSrvKeyspaceFile; +import static com.jd.jdbc.topo.Topo.pathForTabletAlias; +import static com.jd.jdbc.topo.Topo.pathForVschemaFile; +import static com.jd.jdbc.topo.TopoConnection.ConnGetResponse; +import static com.jd.jdbc.topo.TopoExceptionCode.NO_NODE; + public class TopoServer implements Resource, TopoCellInfo, TopoSrvKeyspace, TopoTablet, TopoVschema { + private static final Log log = LogFactory.getLog(TopoServer.class); + static final String GLOBAL_CELL = "global"; static final String GLOBAL_READ_ONLY_CELL = "global-read-only"; @@ -73,6 +78,8 @@ public class TopoServer implements Resource, TopoCellInfo, TopoSrvKeyspace, Topo Map cells; + String serverAddress; + /** * */ @@ -183,6 +190,33 @@ public static Map getSrvkeyspaceMapCopy() { return new HashMap<>(SRVKEYSPACE_MAP); } + public List getAllCells(IContext ctx) throws TopoException { + List dirEntryList = this.globalCell.listDir(ctx, CELLS_PATH, false, false); + List cells = Topo.dirEntriesToStringArray(dirEntryList); + if (cells.size() < 1) { + throw TopoException.wrap("Cells Information missing"); + } + return cells; + } + + public String getLocalCell(IContext globalContext, TopoServer topoServer, List cells, String defaultKeyspace) throws TopoException { + String localCell = cells.get(0); + for (String cell : cells) { + try { + Topodata.SrvKeyspace getSrvKeyspace = topoServer.getSrvKeyspace(globalContext, cell, defaultKeyspace); + if (getSrvKeyspace != null) { + localCell = cell; + return localCell; + } + } catch (TopoException e) { + if (e.getCode() != TopoExceptionCode.NO_NODE) { + throw TopoException.wrap(e.getMessage()); + } + } + } + return localCell; + } + /** * @param ctx * @param cell @@ -309,6 +343,7 @@ public List getTabletAliasByCell(IContext ctx, String cell return result; } catch (TopoException e) { if (TopoException.isErrType(e, NO_NODE)) { + log.debug("failed to get tablet in cell " + cell + " . error: " + e); return null; } throw e; @@ -352,4 +387,8 @@ public Vschema.Keyspace getVschema(IContext ctx, String keyspaceName) throws Top } return keyspace; } + + public String getServerAddress() { + return this.serverAddress; + } } \ No newline at end of file diff --git a/src/main/java/com/jd/jdbc/topo/TopoStatsConnection.java b/src/main/java/com/jd/jdbc/topo/TopoStatsConnection.java index 122b116..e75a639 100644 --- a/src/main/java/com/jd/jdbc/topo/TopoStatsConnection.java +++ b/src/main/java/com/jd/jdbc/topo/TopoStatsConnection.java @@ -44,7 +44,7 @@ public CompletableFuture> listDirFuture(IContext ctx, String dirP @Override public Version create(IContext ctx, String filePath, byte[] contents) throws TopoException { - return null; + return this.topoConnection.create(ctx, filePath, contents); } @Override diff --git a/src/main/java/com/jd/jdbc/util/KeyspaceUtil.java b/src/main/java/com/jd/jdbc/util/KeyspaceUtil.java index 9bf097c..294e457 100644 --- a/src/main/java/com/jd/jdbc/util/KeyspaceUtil.java +++ b/src/main/java/com/jd/jdbc/util/KeyspaceUtil.java @@ -16,9 +16,10 @@ package com.jd.jdbc.util; -import static com.jd.jdbc.common.Constant.DEFAULT_DATABASE_PREFIX; import io.netty.util.internal.StringUtil; +import static com.jd.jdbc.common.Constant.DEFAULT_DATABASE_PREFIX; + public class KeyspaceUtil { public static String getLogicSchema(String tableCat) { if (StringUtil.isNullOrEmpty(tableCat)) { @@ -37,4 +38,8 @@ public static String getRealSchema(String tableCat) { } return DEFAULT_DATABASE_PREFIX + tableCat; } + + public static String getTabletKeyspace(String keyspace) { + return keyspace; + } } \ No newline at end of file diff --git a/src/main/java/com/jd/jdbc/vitess/VitessDriver.java b/src/main/java/com/jd/jdbc/vitess/VitessDriver.java index 8d95cfd..4cf33ba 100755 --- a/src/main/java/com/jd/jdbc/vitess/VitessDriver.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessDriver.java @@ -42,6 +42,7 @@ import com.jd.jdbc.tindexes.SplitTableUtil; import com.jd.jdbc.topo.Topo; import com.jd.jdbc.topo.TopoServer; +import com.jd.jdbc.util.KeyspaceUtil; import com.jd.jdbc.util.threadpool.InitThreadPoolService; import com.jd.jdbc.vindexes.hash.BinaryHash; import io.prometheus.client.Histogram; @@ -52,7 +53,6 @@ import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Properties; import java.util.concurrent.locks.ReentrantLock; @@ -93,37 +93,42 @@ public Connection initConnect(String url, Properties info, boolean initOnly) thr try { SecurityCenter.INSTANCE.addCredential(prop); String defaultKeyspace = prop.getProperty(Constant.DRIVER_PROPERTY_SCHEMA); + String tabletKeyspace = KeyspaceUtil.getTabletKeyspace(defaultKeyspace); String role = prop.getProperty(Constant.DRIVER_PROPERTY_ROLE_KEY, Constant.DRIVER_PROPERTY_ROLE_RW); if (!prop.containsKey(Constant.DRIVER_PROPERTY_ROLE_KEY)) { prop.put(Constant.DRIVER_PROPERTY_ROLE_KEY, role); } TopoServer topoServer = Topo.getTopoServer(Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, "http://" + prop.getProperty("host") + ":" + prop.getProperty("port")); + ResilientServer resilientServer = SrvTopo.newResilientServer(topoServer, "ResilientSrvTopoServer"); - List cells = Arrays.asList(prop.getProperty("cell").split(",")); - String localCell = cells.get(0); + List cells = topoServer.getAllCells(globalContext); + for (String cell : cells) { - TopologyWatcherManager.INSTANCE.startWatch(globalContext, topoServer, cell, defaultKeyspace); + TopologyWatcherManager.INSTANCE.startWatch(globalContext, topoServer, cell, tabletKeyspace); } TabletGateway tabletGateway = TabletGateway.build(resilientServer); for (String cell : cells) { - TopologyWatcherManager.INSTANCE.watch(globalContext, cell, defaultKeyspace); + TopologyWatcherManager.INSTANCE.watch(globalContext, cell, tabletKeyspace); } + + String localCell = topoServer.getLocalCell(globalContext, topoServer, cells, tabletKeyspace); + boolean masterFlag = role.equalsIgnoreCase(Constant.DRIVER_PROPERTY_ROLE_RW); List tabletTypes = masterFlag ? Lists.newArrayList(Topodata.TabletType.MASTER) : Lists.newArrayList(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY); - tabletGateway.waitForTablets(globalContext, localCell, defaultKeyspace, tabletTypes); + tabletGateway.waitForTablets(globalContext, localCell, tabletKeyspace, tabletTypes); TxConn txConn = new TxConn(tabletGateway, Vtgate.TransactionMode.MULTI); ScatterConn scatterConn = ScatterConn.newScatterConn("VttabletCall", txConn, tabletGateway); Resolver resolver = new Resolver(resilientServer, tabletGateway, localCell, scatterConn); Topodata.TabletType tabletType = VitessJdbcProperyUtil.getTabletType(prop); - List resolvedShardList = resolver.getAllShards(globalContext, defaultKeyspace, Topodata.TabletType.MASTER).getResolvedShardList(); + List resolvedShardList = resolver.getAllShards(globalContext, tabletKeyspace, Topodata.TabletType.MASTER).getResolvedShardList(); int shardNumber = CollectionUtils.isEmpty(resolvedShardList) ? 0 : resolvedShardList.size(); Config.setConfig(prop, defaultKeyspace, SecurityCenter.INSTANCE.getCredential(defaultKeyspace).getUser(), tabletType, shardNumber); if (masterFlag) { @@ -208,5 +213,4 @@ public boolean jdbcCompliant() { public Logger getParentLogger() throws SQLFeatureNotSupportedException { throw new SQLFeatureNotSupportedException(); } - } \ No newline at end of file diff --git a/src/main/java/com/jd/jdbc/vitess/VitessJdbcProperyUtil.java b/src/main/java/com/jd/jdbc/vitess/VitessJdbcProperyUtil.java index 6b22654..cf840ef 100644 --- a/src/main/java/com/jd/jdbc/vitess/VitessJdbcProperyUtil.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessJdbcProperyUtil.java @@ -64,16 +64,6 @@ public static void checkCredentials(String path, Properties info) throws SQLExce } } - public static void checkCell(Properties info) throws SQLException { - if (info.getProperty("cell") == null) { - throw new SQLException("no cell found in jdbc url"); - } - String[] cells = info.getProperty("cell").split(","); - if (cells.length < 1) { - throw new SQLException("no cell found in jdbc url"); - } - } - public static void checkSchema(String path) throws SQLException { if (path == null || !path.startsWith("/")) { throw new SQLException("wrong database name path: '" + path + "'"); diff --git a/src/main/java/com/jd/jdbc/vitess/VitessJdbcUrlParser.java b/src/main/java/com/jd/jdbc/vitess/VitessJdbcUrlParser.java index f8eae1e..2b5202b 100755 --- a/src/main/java/com/jd/jdbc/vitess/VitessJdbcUrlParser.java +++ b/src/main/java/com/jd/jdbc/vitess/VitessJdbcUrlParser.java @@ -68,7 +68,6 @@ public static Properties parse(String url, Properties info) throws SQLException } } - VitessJdbcProperyUtil.checkCell(parsedProperties); VitessJdbcProperyUtil.checkCredentials(path, parsedProperties); VitessJdbcProperyUtil.checkServerTimezone(parsedProperties); VitessJdbcProperyUtil.addDefaultProperties(parsedProperties); diff --git a/src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java b/src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java index c96e314..0cb1c13 100644 --- a/src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java +++ b/src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java @@ -19,6 +19,7 @@ package com.jd.jdbc.discovery; import com.jd.jdbc.common.Constant; +import com.jd.jdbc.context.VtContext; import com.jd.jdbc.monitor.SrvKeyspaceCollector; import com.jd.jdbc.topo.Topo; import com.jd.jdbc.topo.TopoConnection; @@ -37,7 +38,6 @@ import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; import java.sql.SQLException; -import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Objects; @@ -90,9 +90,10 @@ public void init() throws TopoException, SQLException { String connectionUrl = getConnectionUrl(Driver.of(TestSuiteShardSpec.TWO_SHARDS)); Properties prop = VitessJdbcUrlParser.parse(connectionUrl, null); keyspace = prop.getProperty(Constant.DRIVER_PROPERTY_SCHEMA); - cells = Arrays.asList(prop.getProperty("cell").split(",")); + String topoServerAddress = "http://" + prop.getProperty("host") + ":" + prop.getProperty("port"); topoServer = Topo.getTopoServer(Topo.TopoServerImplementType.TOPO_IMPLEMENTATION_ETCD2, topoServerAddress); + cells = topoServer.getAllCells(VtContext.withCancel(VtContext.background())); cell = cells.get(0); keyspacePrefix = "testkeyspace"; diff --git a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java index 212609d..3ff1836 100644 --- a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java +++ b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java @@ -23,8 +23,6 @@ import com.jd.jdbc.context.VtContext; import com.jd.jdbc.discovery.TabletHealthCheck.TabletStreamHealthStatus; import com.jd.jdbc.monitor.HealthyCollector; -import com.jd.jdbc.queryservice.CombinedQueryService; -import com.jd.jdbc.queryservice.IParentQueryService; import com.jd.jdbc.queryservice.MockQueryServer; import com.jd.jdbc.queryservice.TabletDialerAgent; import com.jd.jdbc.topo.MemoryTopoFactory; @@ -33,10 +31,6 @@ import com.jd.jdbc.topo.topoproto.TopoProto; import com.jd.jdbc.util.threadpool.impl.VtHealthCheckExecutorService; import com.jd.jdbc.util.threadpool.impl.VtQueryExecutorService; -import io.grpc.ManagedChannel; -import io.grpc.Server; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.testing.GrpcCleanupRule; import io.vitess.proto.Query; import io.vitess.proto.Topodata; @@ -47,14 +41,9 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import lombok.AllArgsConstructor; -import lombok.Getter; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -66,12 +55,13 @@ import testsuite.TestSuite; public class HealthCheckTest extends TestSuite { - private static final IContext globalContext = VtContext.withCancel(VtContext.background()); - - private static final ExecutorService executorService = getThreadPool(10, 10); @Rule - public GrpcCleanupRule grpcCleanup; + public GrpcCleanupRule grpcCleanup = new GrpcCleanupRule(); + + private final IContext globalContext = VtContext.withCancel(VtContext.background()); + + private static final ExecutorService executorService = getThreadPool(10, 10); @Rule public ExpectedException thrown = ExpectedException.none(); @@ -89,18 +79,17 @@ public static void initPool() { VtQueryExecutorService.initialize(null, null, null, null); } - @AfterClass - public static void afterClass() { - executorService.shutdownNow(); - } - @Before public void init() throws IOException { - grpcCleanup = new GrpcCleanupRule(); portMap.put("vt", 1); portMap.put("grpc", 2); } + @AfterClass + public static void afterClass() { + executorService.shutdownNow(); + } + @After public void resetHealthCheck() { HealthCheck.resetHealthCheck(); @@ -113,38 +102,37 @@ public void resetHealthCheck() { * 2. testing if changing tablet status can be watched right; * 3. testing if removeTable function can work well */ - @Test - public void testHealthCheck() throws InterruptedException { + public void testHealthCheck() { printComment("1. HealthCheck Test"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving Tablet"); - MockTablet mockTablet = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet.getTablet()); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("c. Modify the status of Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); hc.removeTablet(mockTablet.getTablet()); - Thread.sleep(2000); + sleepMillisSeconds(2000); Assert.assertEquals("Wrong Tablet data", 0, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); - closeQueryService(mockTablet); + MockTablet.closeQueryService(mockTablet); printOk(); } @@ -154,38 +142,38 @@ public void testHealthCheck() throws InterruptedException { * 3. testing if tablet can be remove from healthy when receive error message from tablet server */ @Test - public void testHealthCheckStreamError() throws InterruptedException { + public void testHealthCheckStreamError() { printComment("2. HealthCheck Test for Error Stream Message"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving Tablet"); - MockTablet mockTablet = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet.getTablet()); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("c. Modify the status of Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); sendOnErrorMessage(mockTablet); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); List hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); if (hcList != null) { Assert.assertEquals(0, hcList.size()); } - closeQueryService(mockTablet); + MockTablet.closeQueryService(mockTablet); printOk(); } @@ -195,24 +183,24 @@ public void testHealthCheckStreamError() throws InterruptedException { * 3. testing if changing the type of tablet to primary; */ @Test - public void testHealthCheckExternalReparent() throws InterruptedException { + public void testHealthCheckExternalReparent() { printComment("3. HealthCheck Test one tablet External Reparent"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving Tablet"); - MockTablet mockTablet = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); printComment("c. Modify the status of Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); @@ -220,7 +208,7 @@ public void testHealthCheckExternalReparent() throws InterruptedException { printComment("d. Change the Tablet role to primary"); sendOnNextMessage(mockTablet, Topodata.TabletType.MASTER, true, 0, 0.2, 0); - Thread.sleep(200); + sleepMillisSeconds(200); Topodata.Tablet tablet = hc.getHealthyTablets("k", Topodata.TabletType.MASTER); Assert.assertNotNull(tablet); @@ -235,33 +223,33 @@ public void testHealthCheckExternalReparent() throws InterruptedException { new AtomicBoolean(false), Query.RealtimeStats.newBuilder().setCpuUsage(0.2).setSecondsBehindMaster(0).build()); - closeQueryService(mockTablet); + MockTablet.closeQueryService(mockTablet); printOk(); } @Test - public void testHealthCheckTwoExternalReparent() throws InterruptedException { + public void testHealthCheckTwoExternalReparent() { printComment("4. HealthCheck Test two tablets External Reparent"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); printComment("b. Add two Tablets"); - MockTablet mockTablet1 = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet1 = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet1.getTablet()); - MockTablet mockTablet2 = buildMockTablet("cell", 1, "b", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet2 = MockTablet.buildMockTablet(grpcCleanup, "cell", 1, "b", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet2.getTablet()); Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("c. Change two Tablets status"); sendOnNextMessage(mockTablet2, Topodata.TabletType.REPLICA, true, 0, 0.5, 10); sendOnNextMessage(mockTablet1, Topodata.TabletType.MASTER, true, 10, 0.5, 0); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 2, hc.getHealthyCopy().size()); List healthCheckList = hc.getHealthyTabletStats(Query.Target.newBuilder().setKeyspace("k").setShard("s").setTabletType(Topodata.TabletType.MASTER).build()); @@ -276,7 +264,7 @@ public void testHealthCheckTwoExternalReparent() throws InterruptedException { sendOnNextMessage(mockTablet2, Topodata.TabletType.MASTER, true, 20, 0.5, 0); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); healthCheckList = hc.getHealthyTabletStats(Query.Target.newBuilder().setKeyspace("k").setShard("s").setTabletType(Topodata.TabletType.MASTER).build()); @@ -289,7 +277,7 @@ public void testHealthCheckTwoExternalReparent() throws InterruptedException { new AtomicBoolean(false), Query.RealtimeStats.newBuilder().setCpuUsage(0.5).setSecondsBehindMaster(0).build()); - closeQueryService(mockTablet1, mockTablet2); + MockTablet.closeQueryService(mockTablet1, mockTablet2); printOk(); } @@ -302,10 +290,10 @@ public void testHealthCheckVerifiesTabletAlias() throws InterruptedException { printComment("b. Add a no-serving Tablet"); - MockTablet mockTablet = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); @@ -315,16 +303,16 @@ public void testHealthCheckVerifiesTabletAlias() throws InterruptedException { MockQueryServer.HealthCheckMessage message = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Next, streamHealthResponse); mockTablet.getHealthMessage().put(message); - Thread.sleep(200); + sleepMillisSeconds(200); TabletHealthCheck thc = hc.getHealthByAliasCopy().get(TopoProto.tabletAliasString(mockTablet.getTablet().getAlias())); Assert.assertEquals(TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_MISMATCH, thc.getTabletStreamHealthDetailStatus().get().getStatus()); - closeQueryService(mockTablet); + MockTablet.closeQueryService(mockTablet); printOk(); } @Test - public void testHealthCheckRemoveTabletAfterReparent() throws InterruptedException { + public void testHealthCheckRemoveTabletAfterReparent() { printComment("6. HealthCheck Test remove tablet after reparent"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -332,13 +320,13 @@ public void testHealthCheckRemoveTabletAfterReparent() throws InterruptedExcepti printComment("b. Add a no-serving Tablet"); // add master tablet - MockTablet mockTablet = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.MASTER); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, "a", "k", "s", portMap, Topodata.TabletType.MASTER); hc.addTablet(mockTablet.getTablet()); // add replica tablet - MockTablet mockTablet1 = buildMockTablet("cell", 1, "b", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet1 = MockTablet.buildMockTablet(grpcCleanup, "cell", 1, "b", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet1.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); @@ -346,7 +334,7 @@ public void testHealthCheckRemoveTabletAfterReparent() throws InterruptedExcepti sendOnNextMessage(mockTablet, Topodata.TabletType.MASTER, true, 0, 0.5, 0); sendOnNextMessage(mockTablet1, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 2, hc.getHealthyCopy().size()); @@ -354,7 +342,7 @@ public void testHealthCheckRemoveTabletAfterReparent() throws InterruptedExcepti printComment("d. Modify the role of tablet"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); sendOnNextMessage(mockTablet1, Topodata.TabletType.MASTER, true, 10, 0.5, 0); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size()); @@ -363,7 +351,7 @@ public void testHealthCheckRemoveTabletAfterReparent() throws InterruptedExcepti printComment("remove tablet"); hc.removeTablet(mockTablet.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); // healthcheck shouldn't response onNext message List hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); @@ -372,29 +360,29 @@ public void testHealthCheckRemoveTabletAfterReparent() throws InterruptedExcepti } hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.MASTER)); Assert.assertEquals(1, hcList.size()); - closeQueryService(mockTablet, mockTablet1); + MockTablet.closeQueryService(mockTablet, mockTablet1); printOk(); } @Test - public void testHealthCheckOnNextBeforeRemove() throws InterruptedException { + public void testHealthCheckOnNextBeforeRemove() { printComment("6a. HealthCheck Test onNext before remove tablet"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving Tablet"); - MockTablet mockTablet = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); printComment("c. Modify the status of Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); @@ -403,33 +391,33 @@ public void testHealthCheckOnNextBeforeRemove() throws InterruptedException { sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.2, 2); executorService.execute(() -> hc.removeTablet(mockTablet.getTablet())); - Thread.sleep(200); + sleepMillisSeconds(200); // healthcheck shouldn't response onNext message Assert.assertEquals("Wrong Tablet data", 0, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); - closeQueryService(mockTablet); + MockTablet.closeQueryService(mockTablet); printOk(); } @Test - public void testHealthCheckOnNextAfterRemove() throws InterruptedException { + public void testHealthCheckOnNextAfterRemove() { printComment("6b. HealthCheck Test onNext after remove tablet"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving Tablet"); - MockTablet mockTablet = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); printComment("c. Modify the status of Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); @@ -438,26 +426,26 @@ public void testHealthCheckOnNextAfterRemove() throws InterruptedException { executorService.execute(() -> hc.removeTablet(mockTablet.getTablet())); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.2, 2); - Thread.sleep(200); + sleepMillisSeconds(200); // healthcheck shouldn't response onNext message Assert.assertEquals("Wrong Tablet data", 0, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); - closeQueryService(mockTablet); + MockTablet.closeQueryService(mockTablet); printOk(); } @Test - public void testHealthCheckTimeout() throws InterruptedException { + public void testHealthCheckTimeout() { printComment("7. HealthCheck Test when health check timeout"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving Tablet"); - MockTablet mockTablet = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); Assert.assertEquals(1, mockTablet.getQueryServer().getConnectCount()); @@ -465,14 +453,14 @@ public void testHealthCheckTimeout() throws InterruptedException { printComment("c. Modify the status of Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); printComment("d. Sleep and wait for check timeout"); - Thread.sleep(90 * 1000); + sleepMillisSeconds(90 * 1000); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); // user shouldn't get a checking timeout tablet @@ -483,34 +471,31 @@ public void testHealthCheckTimeout() throws InterruptedException { Assert.fail("HealthCheck should try to reconnect tablet query service"); } - closeQueryService(mockTablet); + MockTablet.closeQueryService(mockTablet); } /** * test the functionality of getHealthyTabletStats * 建议统一getHealthyTabletStats,不要即返回null,也可能返回empty list - * - * @throws IOException - * @throws InterruptedException */ @Test - public void testGetHealthyTablet() throws InterruptedException { + public void testGetHealthyTablet() { printComment("9. HealthCheck Test the functionality of getHealthyTabletStats"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving Tablet"); - MockTablet mockTablet = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); Assert.assertEquals(1, mockTablet.getQueryServer().getConnectCount()); printComment("c. Modify the status of Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); - Thread.sleep(200); + sleepMillisSeconds(200); List hcList = hc.getHealthyTabletStats(Query.Target.newBuilder().setKeyspace("k").setShard("s").setTabletType(Topodata.TabletType.REPLICA).build()); Assert.assertEquals(1, hcList.size()); @@ -524,7 +509,7 @@ public void testGetHealthyTablet() throws InterruptedException { // update health with a change that won't change health array printComment("d. update health with realtime stats a change that will change health array"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.2, 35); - Thread.sleep(200); + sleepMillisSeconds(200); hcList = hc.getHealthyTabletStats(Query.Target.newBuilder().setKeyspace("k").setShard("s").setTabletType(Topodata.TabletType.REPLICA).build()); Assert.assertEquals(1, hcList.size()); @@ -536,13 +521,13 @@ public void testGetHealthyTablet() throws InterruptedException { Query.RealtimeStats.newBuilder().setCpuUsage(0.2).setSecondsBehindMaster(35).build()); printComment("e. Add a second tablet"); - MockTablet mockTablet2 = buildMockTablet("cell", 11, "host2", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet2 = MockTablet.buildMockTablet(grpcCleanup, "cell", 11, "host2", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet2.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("f. Modify the status of Tablet2 to serving"); sendOnNextMessage(mockTablet2, Topodata.TabletType.REPLICA, true, 0, 0.2, 10); - Thread.sleep(200); + sleepMillisSeconds(200); hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); Assert.assertEquals("Wrong number of healthy replica tablets", 2, hcList.size()); @@ -569,7 +554,7 @@ public void testGetHealthyTablet() throws InterruptedException { printComment("g. Modify the status of Tablet2 to no-serving"); sendOnNextMessage(mockTablet2, Topodata.TabletType.REPLICA, false, 0, 0.2, 10); - Thread.sleep(200); + sleepMillisSeconds(200); hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); Assert.assertEquals("Wrong number of healthy replica tablets", 1, hcList.size()); @@ -582,7 +567,7 @@ public void testGetHealthyTablet() throws InterruptedException { printComment("h. Second tablet turns into a primary"); sendOnNextMessage(mockTablet2, Topodata.TabletType.MASTER, true, 10, 0.2, 0); - Thread.sleep(200); + sleepMillisSeconds(200); hcList = hc.getHealthyTabletStats(Query.Target.newBuilder().setKeyspace("k").setShard("s").setTabletType(Topodata.TabletType.MASTER).build()); Assert.assertEquals("Wrong number of healthy master tablets", 1, hcList.size()); @@ -595,7 +580,7 @@ public void testGetHealthyTablet() throws InterruptedException { printComment("i. Old replica goes into primary"); sendOnNextMessage(mockTablet, Topodata.TabletType.MASTER, true, 20, 0.2, 0); - Thread.sleep(200); + sleepMillisSeconds(200); // check we lost all replicas, and primary is new one hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); @@ -612,7 +597,7 @@ public void testGetHealthyTablet() throws InterruptedException { // old primary sending an old pin should be ignored sendOnNextMessage(mockTablet2, Topodata.TabletType.MASTER, true, 10, 0.2, 0); - Thread.sleep(200); + sleepMillisSeconds(200); hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.MASTER)); Assert.assertEquals("Wrong number of healthy master tablets", 1, hcList.size()); assertTabletHealthCheck(hcList.get(0), @@ -622,12 +607,14 @@ public void testGetHealthyTablet() throws InterruptedException { new AtomicBoolean(false), Query.RealtimeStats.newBuilder().setCpuUsage(0.2).setSecondsBehindMaster(0).build()); - closeQueryService(mockTablet, mockTablet2); + MockTablet.closeQueryService(mockTablet, mockTablet2); printOk(); } @Test public void testPrimaryInOtherCell() throws TopoException, InterruptedException { + TopologyWatcherManager.INSTANCE.resetScheduledExecutor(); + TopoServer topoServer = MemoryTopoFactory.newServerAndFactory("cell1", "cell2").getTopoServer(); startWatchTopo("k", topoServer, "cell1", "cell2"); @@ -636,25 +623,27 @@ public void testPrimaryInOtherCell() throws TopoException, InterruptedException HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving primary Tablet in different cell"); - MockTablet mockTablet = buildMockTablet("cell2", 0, "a", "k", "s", portMap, Topodata.TabletType.MASTER); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell2", 0, "a", "k", "s", portMap, Topodata.TabletType.MASTER); hc.addTablet(mockTablet.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("c. Modify the status of Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.MASTER, true, 0, 0.5, 0); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("d.// check that PRIMARY tablet from other cell IS in healthy tablet list"); List hcList = hc.getHealthyTabletStats(Query.Target.newBuilder().setKeyspace("k").setShard("s").setTabletType(Topodata.TabletType.MASTER).build()); Assert.assertEquals(1, hcList.size()); - closeQueryService(mockTablet); + MockTablet.closeQueryService(mockTablet); printOk(); } @Test public void testReplicaInOtherCell() throws TopoException, InterruptedException { + TopologyWatcherManager.INSTANCE.resetScheduledExecutor(); + TopoServer topoServer = MemoryTopoFactory.newServerAndFactory("cell1", "cell2").getTopoServer(); startWatchTopo("k", topoServer, "cell1", "cell2"); @@ -663,13 +652,13 @@ public void testReplicaInOtherCell() throws TopoException, InterruptedException HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving replica Tablet"); - MockTablet mockTablet = buildMockTablet("cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("c. Modify the status of Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 0); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("d. check that replica tablet IS in healthy tablet list"); List hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); @@ -677,41 +666,41 @@ public void testReplicaInOtherCell() throws TopoException, InterruptedException printComment("e. Add a tablet as replica in different cell"); - MockTablet mockTablet1 = buildMockTablet("cell2", 1, "b", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet1 = MockTablet.buildMockTablet(grpcCleanup, "cell2", 1, "b", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet1.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("f. Modify the status of Tablet to serving"); sendOnNextMessage(mockTablet1, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("g. check that only REPLICA tablet from cell1 is in healthy tablet list"); hcList = hc.getHealthyTabletStats(Query.Target.newBuilder().setKeyspace("k").setShard("s").setTabletType(Topodata.TabletType.REPLICA).build()); Assert.assertEquals(2, hcList.size()); - closeQueryService(mockTablet, mockTablet1); + MockTablet.closeQueryService(mockTablet, mockTablet1); printOk(); } @Test - public void testGetStandbyTablet() throws InterruptedException { + public void testGetStandbyTablet() { printComment("12. HealthCheck Test get healthy tablet maybe standby"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving replica Tablet"); - MockTablet mockTablet = buildMockTablet("cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet.getTablet()); - MockTablet mockTablet1 = buildMockTablet("cell1", 1, "b", "k", "s", portMap, Topodata.TabletType.RDONLY); + MockTablet mockTablet1 = MockTablet.buildMockTablet(grpcCleanup, "cell1", 1, "b", "k", "s", portMap, Topodata.TabletType.RDONLY); hc.addTablet(mockTablet1.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("c. Modify the status of REPLICA Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 0); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("d. Try to get RDONLY tablet by standBy func"); List hcList = hc.getHealthyTabletStatsMaybeStandby(Query.Target.newBuilder().setKeyspace("k").setShard("s").setTabletType(Topodata.TabletType.RDONLY).build()); @@ -726,11 +715,11 @@ public void testGetStandbyTablet() throws InterruptedException { printComment("e. Modify the status of RDONLY Tablet to serving"); sendOnNextMessage(mockTablet1, Topodata.TabletType.RDONLY, true, 0, 0.25, 10); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("f. send onError the replica GRPC server"); sendOnErrorMessage(mockTablet); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("g. Try to get REPLICA tablet by standBy func"); hcList = hc.getHealthyTabletStatsMaybeStandby(createTarget(Topodata.TabletType.REPLICA)); @@ -743,52 +732,52 @@ public void testGetStandbyTablet() throws InterruptedException { new AtomicBoolean(false), Query.RealtimeStats.newBuilder().setCpuUsage(0.25).setSecondsBehindMaster(10).build()); - closeQueryService(mockTablet1); + MockTablet.closeQueryService(mockTablet1); printOk(); } @Test - public void testUnhealthyReplicaAsSecondsBehind() throws InterruptedException { + public void testUnhealthyReplicaAsSecondsBehind() { printComment("13. HealthCheck Test get healthy tablet maybe standby"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving replica Tablet"); - MockTablet mockTablet = buildMockTablet("cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("c. Modify the status of REPLICA Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 10); - Thread.sleep(200); + sleepMillisSeconds(200); List hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); Assert.assertEquals("Wrong number of healthy replica tablets", 1, hcList.size()); printComment("e. Modify the value of seconds behind master of REPLICA Tablet"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 7201); - Thread.sleep(200); + sleepMillisSeconds(200); hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); if (hcList != null) { Assert.assertEquals("Wrong number of healthy replica tablets, it should be 0", 0, hcList.size()); } - closeQueryService(mockTablet); + MockTablet.closeQueryService(mockTablet); printOk(); } @Test - public void testMysqlPort0to3358() throws InterruptedException { + public void testMysqlPort0to3358() { printComment("14. HealthCheck Test in Tablet MySQL port changed from 0 to 3358"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving replica Tablet (MysqlPort = 0)"); defaultMysqlPort = 0; - MockTablet mockTablet = buildMockTablet("cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA, defaultMysqlPort); hc.addTablet(mockTablet.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 0, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); @@ -796,41 +785,41 @@ public void testMysqlPort0to3358() throws InterruptedException { defaultMysqlPort = 3358; Topodata.Tablet tablet = buildTablet("cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.replaceTablet(mockTablet.getTablet(), tablet); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); printComment("d. Modify the status of REPLICA Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 10); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); List hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); Assert.assertEquals("Wrong number of healthy replica tablets", 1, hcList.size()); - closeQueryService(mockTablet); + MockTablet.closeQueryService(mockTablet); printOk(); } @Test - public void testMysqlPort3358to0() throws InterruptedException { + public void testMysqlPort3358to0() { printComment("15. HealthCheck Test in Tablet MySQL port changed from 3358 to 0"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); printComment("b. Add a no-serving Tablet(MysqlPort = 3358)"); - MockTablet mockTablet = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet.getTablet()); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("c. Modify the status of Tablet to serving"); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); @@ -841,17 +830,17 @@ public void testMysqlPort3358to0() throws InterruptedException { defaultMysqlPort = 0; Topodata.Tablet tablet = buildTablet("cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.replaceTablet(mockTablet.getTablet(), tablet); - Thread.sleep(6000); + sleepMillisSeconds(6000); Assert.assertEquals("Wrong Tablet data", 0, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); - closeQueryService(mockTablet); + MockTablet.closeQueryService(mockTablet); printOk(); } @Test - public void testDoubleMaster() throws InterruptedException { + public void testDoubleMaster() { printComment("16. double master one no serving"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -859,13 +848,13 @@ public void testDoubleMaster() throws InterruptedException { printComment("b. Add a no-serving Tablet"); // add master tablet - MockTablet mockTablet = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.MASTER); + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, "a", "k", "s", portMap, Topodata.TabletType.MASTER); hc.addTablet(mockTablet.getTablet()); // add replica tablet - MockTablet mockTablet1 = buildMockTablet("cell", 1, "b", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet1 = MockTablet.buildMockTablet(grpcCleanup, "cell", 1, "b", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet1.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); @@ -873,18 +862,18 @@ public void testDoubleMaster() throws InterruptedException { sendOnNextMessage(mockTablet, Topodata.TabletType.MASTER, true, 0, 0.5, 0); sendOnNextMessage(mockTablet1, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 2, hc.getHealthyCopy().size()); printComment("d. Modify the role of tablet"); sendOnNextMessage(mockTablet1, Topodata.TabletType.MASTER, true, 10, 0.5, 0); - Thread.sleep(200); + sleepMillisSeconds(200); printComment("e. Modify old master Tablet to no serving"); sendOnErrorMessage(mockTablet); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); @@ -896,6 +885,7 @@ public void testDoubleMaster() throws InterruptedException { Assert.assertTrue(CollectionUtils.isNotEmpty(healthyTabletStats)); Assert.assertEquals(1, healthyTabletStats.size()); + MockTablet.closeQueryService(mockTablet, mockTablet1); printOk(); } @@ -930,19 +920,19 @@ public void testHealthyListChecksum() { } @Test - public void testHealthyChecksumSetBehindMaster() throws InterruptedException { + public void testHealthyChecksumSetBehindMaster() { HealthCheck hc = getHealthCheck(); // add tablet String keyInHealthy = "k.s.replica"; - MockTablet mockTablet1 = buildMockTablet("cella", 7, "1.1.1.7", "k", "s", portMap, Topodata.TabletType.REPLICA); - MockTablet mockTablet2 = buildMockTablet("cella", 8, "1.1.1.8", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet1 = MockTablet.buildMockTablet(grpcCleanup, "cellb", 67, "1.2.3.67", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet2 = MockTablet.buildMockTablet(grpcCleanup, "cellb", 198, "1.2.3.198", "k", "s", portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet1.getTablet()); hc.addTablet(mockTablet2.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); sendOnNextMessage(mockTablet1, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); sendOnNextMessage(mockTablet2, Topodata.TabletType.REPLICA, true, 0, 0.5, 2); - Thread.sleep(200); + sleepMillisSeconds(200); // sort list in healthy order by secondsBehindMaster hc.recomputeHealthyLocked(keyInHealthy); @@ -950,7 +940,7 @@ public void testHealthyChecksumSetBehindMaster() throws InterruptedException { sendOnNextMessage(mockTablet1, Topodata.TabletType.REPLICA, true, 0, 0.5, 2); sendOnNextMessage(mockTablet2, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); - Thread.sleep(200); + sleepMillisSeconds(200); // sort list in healthy order by secondsBehindMaster hc.recomputeHealthyLocked(keyInHealthy); @@ -959,11 +949,11 @@ public void testHealthyChecksumSetBehindMaster() throws InterruptedException { Assert.assertNotEquals(hc.getHealthyCopy().get(keyInHealthy).get(0).getTablet().getHostname(), hc.getHealthyCopy().get(keyInHealthy).get(1).getTablet().getHostname()); Assert.assertEquals("Wrong HealthyChecksum", firstCrc32, secondCrc32); - closeQueryService(mockTablet1, mockTablet2); + MockTablet.closeQueryService(mockTablet1, mockTablet2); } @Test - public void testConcurrentModificationException() throws InterruptedException { + public void testConcurrentModificationException() { thrown.expect(ConcurrentModificationException.class); List tabletHealthCheckList = new ArrayList<>(); @@ -995,26 +985,26 @@ public void testConcurrentModificationException() throws InterruptedException { mockGetHealthyTabletStats(healthy, target); } - Thread.sleep(2000); + sleepMillisSeconds(2000); } @Test - public void testHealthyConcurrentModificationException() throws InterruptedException { + public void testHealthyConcurrentModificationException() { HealthCheck hc = getHealthCheck(); String keyspace = "k"; String shard = "s"; // add replica tablet - MockTablet mockTablet0 = buildMockTablet("cell", 1, "a", keyspace, shard, portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet0 = MockTablet.buildMockTablet(grpcCleanup, "cell", 1, "a", keyspace, shard, portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet0.getTablet()); // add replica tablet - MockTablet mockTablet1 = buildMockTablet("cell", 100, "b", keyspace, shard, portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet1 = MockTablet.buildMockTablet(grpcCleanup, "cell", 100, "b", keyspace, shard, portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet1.getTablet()); // add replica tablet - MockTablet mockTablet2 = buildMockTablet("cell", 10, "c", keyspace, shard, portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet2 = MockTablet.buildMockTablet(grpcCleanup, "cell", 10, "c", keyspace, shard, portMap, Topodata.TabletType.REPLICA); hc.addTablet(mockTablet2.getTablet()); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 3, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); @@ -1023,7 +1013,7 @@ public void testHealthyConcurrentModificationException() throws InterruptedExcep sendOnNextMessage(mockTablet1, Topodata.TabletType.REPLICA, true, 0, 0.5, 0); sendOnNextMessage(mockTablet2, Topodata.TabletType.REPLICA, true, 0, 0.5, 0); - Thread.sleep(200); + sleepMillisSeconds(200); Assert.assertEquals("Wrong Tablet data", 3, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); @@ -1044,7 +1034,8 @@ public void testHealthyConcurrentModificationException() throws InterruptedExcep Assert.assertEquals("Wrong Tablet data", 3, healthyTabletStats.size()); } - Thread.sleep(2000); + sleepMillisSeconds(2000); + MockTablet.closeQueryService(mockTablet0, mockTablet1, mockTablet2); } private List mockGetHealthyTabletStats(Map> healthy, Query.Target target) { @@ -1070,13 +1061,6 @@ private void startWatchTopo(String keyspaceName, TopoServer topoServer, String.. } } - private void closeQueryService(MockTablet... tablets) throws InterruptedException { - MockQueryServer.HealthCheckMessage close = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Close, null); - for (MockTablet tablet : tablets) { - tablet.getHealthMessage().put(close); - } - } - protected HealthCheck getHealthCheck() { HealthCheck hc = HealthCheck.INSTANCE; Assert.assertEquals(0, hc.getHealthByAliasCopy().size()); @@ -1095,30 +1079,6 @@ private void assertTabletHealthCheck(TabletHealthCheck actualTabletHealthCheck, Assert.assertEquals("Wrong realtime stats", expectStats, actualTabletHealthCheck.getStats()); } - protected MockTablet buildMockTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type) { - String serverName = InProcessServerBuilder.generateName(); - BlockingQueue healthMessage = new ArrayBlockingQueue<>(2); - MockQueryServer queryServer = new MockQueryServer(healthMessage); - Server server = null; - try { - server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - grpcCleanup.register(server); - - ManagedChannel channel = - InProcessChannelBuilder.forName(serverName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build(); - grpcCleanup.register(channel); - Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type); - - IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet); - TabletDialerAgent.registerTabletCache(tablet, combinedQueryService); - - return new MockTablet(tablet, healthMessage, queryServer, server, channel); - } - private Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type) { Topodata.TabletAlias tabletAlias = Topodata.TabletAlias.newBuilder().setCell(cell).setUid(uid).build(); @@ -1165,19 +1125,4 @@ protected void sendOnErrorMessage(MockTablet mockTablet) { throw new RuntimeException(e); } } - - @AllArgsConstructor - @Getter - private class MockTablet { - - private final Topodata.Tablet tablet; - - private final BlockingQueue healthMessage; - - private final MockQueryServer queryServer; - - private final Server server; - - private final ManagedChannel channel; - } } diff --git a/src/test/java/com/jd/jdbc/discovery/MockTablet.java b/src/test/java/com/jd/jdbc/discovery/MockTablet.java new file mode 100644 index 0000000..b4c2c5e --- /dev/null +++ b/src/test/java/com/jd/jdbc/discovery/MockTablet.java @@ -0,0 +1,143 @@ +package com.jd.jdbc.discovery; + +import com.jd.jdbc.queryservice.CombinedQueryService; +import com.jd.jdbc.queryservice.IParentQueryService; +import com.jd.jdbc.queryservice.MockQueryServer; +import com.jd.jdbc.queryservice.TabletDialerAgent; +import io.grpc.ManagedChannel; +import io.grpc.Server; +import io.grpc.inprocess.InProcessChannelBuilder; +import io.grpc.inprocess.InProcessServerBuilder; +import io.grpc.testing.GrpcCleanupRule; +import io.vitess.proto.Topodata; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; +import lombok.Getter; + +@Getter +public class MockTablet { + + private final Topodata.Tablet tablet; + + private final BlockingQueue healthMessage; + + private final MockQueryServer queryServer; + + private final Server server; + + private final ManagedChannel channel; + + private MockTablet(Topodata.Tablet tablet, BlockingQueue healthMessage, MockQueryServer queryServer, Server server, ManagedChannel channel) { + this.tablet = tablet; + this.healthMessage = healthMessage; + this.queryServer = queryServer; + this.server = server; + this.channel = channel; + } + + public static MockTablet buildMockTablet(Topodata.Tablet tablet, GrpcCleanupRule grpcCleanup) { + String serverName = InProcessServerBuilder.generateName(); + BlockingQueue healthMessage = new ArrayBlockingQueue<>(2); + MockQueryServer queryServer = new MockQueryServer(healthMessage); + Server server = null; + try { + server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + grpcCleanup.register(server); + + ManagedChannel channel = + InProcessChannelBuilder.forName(serverName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build(); + grpcCleanup.register(channel); + + IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet); + TabletDialerAgent.registerTabletCache(tablet, combinedQueryService); + + return new MockTablet(tablet, healthMessage, queryServer, server, channel); + } + + public static MockTablet buildMockTablet(GrpcCleanupRule grpcCleanup, String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, + Topodata.TabletType type) { + String serverName = InProcessServerBuilder.generateName(); + BlockingQueue healthMessage = new ArrayBlockingQueue<>(2); + MockQueryServer queryServer = new MockQueryServer(healthMessage); + Server server = null; + try { + server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + grpcCleanup.register(server); + + ManagedChannel channel = + InProcessChannelBuilder.forName(serverName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build(); + grpcCleanup.register(channel); + + Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type, 3358); + IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet); + TabletDialerAgent.registerTabletCache(tablet, combinedQueryService); + + return new MockTablet(tablet, healthMessage, queryServer, server, channel); + } + + public static MockTablet buildMockTablet(GrpcCleanupRule grpcCleanup, String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, + Topodata.TabletType type, + int defaultMysqlPort) { + String serverName = InProcessServerBuilder.generateName(); + BlockingQueue healthMessage = new ArrayBlockingQueue<>(2); + MockQueryServer queryServer = new MockQueryServer(healthMessage); + Server server = null; + try { + server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + grpcCleanup.register(server); + + ManagedChannel channel = + InProcessChannelBuilder.forName(serverName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build(); + grpcCleanup.register(channel); + + Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type, defaultMysqlPort); + IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet); + TabletDialerAgent.registerTabletCache(tablet, combinedQueryService); + + return new MockTablet(tablet, healthMessage, queryServer, server, channel); + } + + public static Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type, + int defaultMysqlPort) { + Topodata.TabletAlias tabletAlias = Topodata.TabletAlias.newBuilder().setCell(cell).setUid(uid).build(); + Topodata.Tablet.Builder tabletBuilder = Topodata.Tablet.newBuilder() + .setHostname(hostName).setAlias(tabletAlias).setKeyspace(keyspaceName).setShard(shard).setType(type).setMysqlHostname(hostName).setMysqlPort(defaultMysqlPort); + for (Map.Entry portEntry : portMap.entrySet()) { + tabletBuilder.putPortMap(portEntry.getKey(), portEntry.getValue()); + } + return tabletBuilder.build(); + } + + public static void closeQueryService(MockTablet... tablets) { + MockQueryServer.HealthCheckMessage close = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Close, null); + for (MockTablet tablet : tablets) { + try { + tablet.getHealthMessage().put(close); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + TabletDialerAgent.close(tablet.getTablet()); + } + } + + public void close() { + healthMessage.clear(); + server.shutdownNow(); + channel.shutdownNow(); + } +} diff --git a/src/test/java/com/jd/jdbc/queryservice/TabletDialerAgent.java b/src/test/java/com/jd/jdbc/queryservice/TabletDialerAgent.java index 8ec11a2..82521d3 100644 --- a/src/test/java/com/jd/jdbc/queryservice/TabletDialerAgent.java +++ b/src/test/java/com/jd/jdbc/queryservice/TabletDialerAgent.java @@ -28,4 +28,8 @@ public static void registerTabletCache(final Topodata.Tablet tablet, final IPare public static void clearTabletCache() { TabletDialer.clearTabletCache(); } + + public static void close(final Topodata.Tablet tablet) { + TabletDialer.close(tablet); + } } diff --git a/src/test/java/com/jd/jdbc/topo/MemoryTopoFactory.java b/src/test/java/com/jd/jdbc/topo/MemoryTopoFactory.java index 0b64b35..35750a5 100644 --- a/src/test/java/com/jd/jdbc/topo/MemoryTopoFactory.java +++ b/src/test/java/com/jd/jdbc/topo/MemoryTopoFactory.java @@ -26,6 +26,8 @@ import lombok.Synchronized; import org.apache.commons.lang3.RandomUtils; +import static com.jd.jdbc.topo.TopoExceptionCode.NO_NODE; + @Getter public class MemoryTopoFactory implements TopoFactory { @@ -57,7 +59,7 @@ public MemoryTopoServer.Node newFile(String name, byte[] contents, MemoryTopoSer return new MemoryTopoServer.Node(name, getNextVersion(), contents, new ConcurrentHashMap<>(), parent, false); } - private MemoryTopoServer.Node newDirectory(String name, MemoryTopoServer.Node parent) { + public MemoryTopoServer.Node newDirectory(String name, MemoryTopoServer.Node parent) { return newFile(name, null, parent); } @@ -89,6 +91,20 @@ public MemoryTopoServer.Node nodeByPath(String cell, String filePath) { return node; } + public void deleteNode(String deleteCell) throws TopoException { + String cell = "global"; + String filePath = "cells"; + MemoryTopoServer.Node node = nodeByPath(cell, filePath); + if (node == null) { + throw TopoException.wrap(NO_NODE, filePath); + } + + if (!node.isDirectory()) { + throw TopoException.wrap("node " + filePath + " in cell " + cell + " is not a directory"); + } + node.getChildren().remove(deleteCell); + } + public MemoryTopoServer.Node getOrCreatePath(String cell, String filePath) { MemoryTopoServer.Node node = this.cells.get(cell); if (node == null) { diff --git a/src/test/java/com/jd/jdbc/topo/MemoryTopoServer.java b/src/test/java/com/jd/jdbc/topo/MemoryTopoServer.java index 2f1b2ee..a3b1417 100644 --- a/src/test/java/com/jd/jdbc/topo/MemoryTopoServer.java +++ b/src/test/java/com/jd/jdbc/topo/MemoryTopoServer.java @@ -19,8 +19,9 @@ package com.jd.jdbc.topo; import com.jd.jdbc.context.IContext; -import static com.jd.jdbc.topo.TopoExceptionCode.NODE_EXISTS; -import static com.jd.jdbc.topo.TopoExceptionCode.NO_NODE; +import com.jd.jdbc.context.VtBackgroundContext; +import com.jd.jdbc.topo.topoproto.TopoProto; +import io.vitess.proto.Topodata; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -29,6 +30,10 @@ import lombok.AllArgsConstructor; import lombok.Getter; +import static com.jd.jdbc.topo.TopoExceptionCode.NODE_EXISTS; +import static com.jd.jdbc.topo.TopoExceptionCode.NO_NODE; +import static com.jd.jdbc.topo.TopoServer.TABLET_FILE; + @AllArgsConstructor public class MemoryTopoServer implements TopoConnection { @@ -89,7 +94,7 @@ public CompletableFuture> listDirFuture(IContext ctx, String dirP public Version create(IContext ctx, String filePath, byte[] contents) throws TopoException { byte[] contentsCopy = contents; if (contentsCopy == null) { - contentsCopy = new byte[]{}; + contentsCopy = new byte[] {}; } String[] dirFile = pathSplit(filePath); @@ -113,13 +118,13 @@ public Version create(IContext ctx, String filePath, byte[] contents) throws Top private String[] pathSplit(String path) { int i = lastSlash(path); - String[] strings = new String[]{path.substring(0, i+1), path.substring(i+1)}; + String[] strings = new String[] {path.substring(0, i + 1), path.substring(i + 1)}; return strings; } private int lastSlash(String path) { int i = path.length() - 1; - for (; i >=0 ; i--) { + for (; i >= 0; i--) { if (path.charAt(i) == '/') { return i; } @@ -156,8 +161,8 @@ public List getTabletsByCell(IContext ctx, String filePath) thr ConnGetResponse connGetResponse; for (Map.Entry childMap : node.getChildren().entrySet()) { connGetResponse = new ConnGetResponse(); - connGetResponse.setContents(childMap.getValue().getContents()); - connGetResponse.setVersion(childMap.getValue()); + connGetResponse.setContents(childMap.getValue().getChildren().get(TABLET_FILE).getContents()); + connGetResponse.setVersion(childMap.getValue().getChildren().get(TABLET_FILE)); connGetResponseList.add(connGetResponse); } return connGetResponseList; @@ -185,6 +190,37 @@ public void close() { } + public static void addCellInMemoryTopo(TopoServer topoServer, String cell) throws TopoException { + TopoConnection globalCell = topoServer.globalCell; + globalCell.create(new VtBackgroundContext(), Topo.pathForCellInfo(cell), null); + TopoFactory factory = topoServer.topoFactory; + if (factory instanceof MemoryTopoFactory) { + MemoryTopoFactory memoryTopoFactory = (MemoryTopoFactory) factory; + memoryTopoFactory.getCells().put(cell, memoryTopoFactory.newDirectory(cell, null)); + } + } + + public static void deleteCellInMemoryTopo(IContext ctx, TopoServer topoServer, String cell) throws TopoException { + TopoConnection globalCell = topoServer.globalCell; + TopoFactory factory = topoServer.topoFactory; + if (factory instanceof MemoryTopoFactory) { + MemoryTopoFactory memoryTopoFactory = (MemoryTopoFactory) factory; + memoryTopoFactory.deleteNode(cell); + memoryTopoFactory.getCells().remove(cell); + } + List allCells = topoServer.getAllCells(ctx); + } + + // createTablet creates a new tablet and all associated paths for the + // replication graph. + public static void createTablet(IContext ctx, TopoServer topoServer, Topodata.Tablet tablet) throws TopoException { + TopoConnection conn = topoServer.connForCell(ctx, tablet.getAlias().getCell()); + byte[] data = tablet.toByteArray(); + //Topodata.Tablet testTablet = Topodata.Tablet.parseFrom(data); + String tabletPath = Topo.pathForTabletAlias(TopoProto.tabletAliasString(tablet.getAlias())); + conn.create(ctx, tabletPath, data); + } + @AllArgsConstructor @Getter public static class Node implements Version { diff --git a/src/test/java/com/jd/jdbc/topo/discovery/TestHealthCheck.java b/src/test/java/com/jd/jdbc/topo/discovery/TestHealthCheck.java index 1b06913..663d70e 100644 --- a/src/test/java/com/jd/jdbc/topo/discovery/TestHealthCheck.java +++ b/src/test/java/com/jd/jdbc/topo/discovery/TestHealthCheck.java @@ -33,19 +33,21 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; public class TestHealthCheck { private final HealthCheck hc = HealthCheck.INSTANCE; + @Ignore @Test public void waitForAllServingTabletsTimeoutTest() throws Exception { IContext ctx = VtContext.withCancel(VtContext.background()); List targetList = new CopyOnWriteArrayList<>(); targetList.add(Query.Target.newBuilder() - .setCell("sh61aa") - .setKeyspace("vtdriver3") + .setCell("aa") + .setKeyspace("vtdriver") .setShard("-80") .setTabletType(Topodata.TabletType.MASTER) .build()); diff --git a/src/test/java/com/jd/jdbc/vitess/VitessJdbcUrlParserTest.java b/src/test/java/com/jd/jdbc/vitess/VitessJdbcUrlParserTest.java index c88e87f..a3338d0 100644 --- a/src/test/java/com/jd/jdbc/vitess/VitessJdbcUrlParserTest.java +++ b/src/test/java/com/jd/jdbc/vitess/VitessJdbcUrlParserTest.java @@ -16,18 +16,20 @@ package com.jd.jdbc.vitess; +import com.jd.jdbc.discovery.TopologyWatcherManager; import com.jd.jdbc.sqlparser.utils.StringUtils; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import testsuite.TestSuite; -import testsuite.internal.environment.TestSuiteEnv; import java.net.URI; import java.net.URISyntaxException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import testsuite.TestSuite; +import testsuite.internal.environment.TestSuiteEnv; + import static com.jd.jdbc.vitess.VitessJdbcUrlParser.JDBC_VITESS_PREFIX; import static testsuite.internal.TestSuiteShardSpec.TWO_SHARDS; @@ -48,6 +50,8 @@ public class VitessJdbcUrlParserTest extends TestSuite { private Connection conn; private void init() throws SQLException { + TopologyWatcherManager.INSTANCE.resetScheduledExecutor(); + String connecturlionUrl = getConnectionUrl(env); URI uri = null; diff --git a/src/test/java/testsuite/TestSuite.java b/src/test/java/testsuite/TestSuite.java index c41c776..f4e2d29 100644 --- a/src/test/java/testsuite/TestSuite.java +++ b/src/test/java/testsuite/TestSuite.java @@ -158,6 +158,10 @@ protected String getPassword(TestSuiteEnv env) { return env.getPassword(); } + protected String getCell(TestSuiteEnv env) { + return env.getCell(); + } + protected static class Driver { public static DriverEnv of(TestSuiteShardSpec shardSpec) { return new DriverEnv(shardSpec); diff --git a/src/test/java/testsuite/internal/TestSuiteConn.java b/src/test/java/testsuite/internal/TestSuiteConn.java index 5645a4a..8fa4ff2 100644 --- a/src/test/java/testsuite/internal/TestSuiteConn.java +++ b/src/test/java/testsuite/internal/TestSuiteConn.java @@ -44,4 +44,8 @@ default String getUser() { default String getPassword() { return null; } + + default String getCell() { + return null; + } } diff --git a/src/test/java/testsuite/internal/config/DriverJdbcCfg.java b/src/test/java/testsuite/internal/config/DriverJdbcCfg.java index 9a4d255..1f40c2b 100644 --- a/src/test/java/testsuite/internal/config/DriverJdbcCfg.java +++ b/src/test/java/testsuite/internal/config/DriverJdbcCfg.java @@ -27,6 +27,10 @@ public void setCell(String cell) { this.cell = cell; } + public String getCell() { + return this.cell; + } + public void setSocketTimeout(String socketTimeout) { this.socketTimeout = socketTimeout; } @@ -38,8 +42,8 @@ public String getPrefix() { @Override public String getJdbcUrl() { - return String.format("%s/%s?cell=%s&user=%s&password=%s&serverTimezone=%s&characterEncoding=%s&socketTimeout=%s", - this.urlPrefix, this.keyspace, this.cell, this.username, this.password, this.serverTimezone, + return String.format("%s/%s?user=%s&password=%s&serverTimezone=%s&characterEncoding=%s&socketTimeout=%s", + this.urlPrefix, this.keyspace, this.username, this.password, this.serverTimezone, this.characterEncoding, this.socketTimeout); } diff --git a/src/test/java/testsuite/internal/environment/DriverEnv.java b/src/test/java/testsuite/internal/environment/DriverEnv.java index 686988b..3369cdc 100644 --- a/src/test/java/testsuite/internal/environment/DriverEnv.java +++ b/src/test/java/testsuite/internal/environment/DriverEnv.java @@ -20,9 +20,10 @@ import java.sql.SQLException; import testsuite.internal.TestSuiteShardSpec; import testsuite.internal.config.DriverJdbcCfg; -import static testsuite.internal.config.TestSuiteCfgPath.DEV; import testsuite.internal.config.TestSuiteCfgReader; +import static testsuite.internal.config.TestSuiteCfgPath.DEV; + public class DriverEnv extends TestSuiteEnv { public DriverEnv(TestSuiteShardSpec shardSpec) { @@ -69,4 +70,10 @@ public String getPassword() { DriverJdbcCfg cfg = TestSuiteCfgReader.read(DriverJdbcCfg.class, this.shardSpec, DEV); return cfg.getPassword(); } + + @Override + public String getCell() { + DriverJdbcCfg cfg = TestSuiteCfgReader.read(DriverJdbcCfg.class, this.shardSpec, DEV); + return cfg.getCell(); + } }