diff --git a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java index 5e110e2..01417b4 100644 --- a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java +++ b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java @@ -25,6 +25,7 @@ import com.jd.jdbc.queryservice.IQueryService; import com.jd.jdbc.sqlparser.support.logging.Log; import com.jd.jdbc.sqlparser.support.logging.LogFactory; +import com.jd.jdbc.sqlparser.utils.StringUtils; import com.jd.jdbc.topo.topoproto.TopoProto; import com.jd.jdbc.util.threadpool.impl.VtHealthCheckExecutorService; import io.netty.util.internal.StringUtil; @@ -224,7 +225,8 @@ public List getHealthyTablets(String keyspace) { } public void addTablet(Topodata.Tablet tablet) { - if (tablet.getPortMapMap().get("grpc") == null) { + if (checkTabletInfoMissing(tablet)) { + log.error("tablet Information missing,tablet = " + tablet); return; } Query.Target target = Query.Target.newBuilder().setKeyspace(tablet.getKeyspace()).setShard(tablet.getShard()).setTabletType(tablet.getType()).build(); @@ -529,4 +531,12 @@ public void waitForAllServingTablets(IContext ctx, List targetList } } } + + private boolean checkTabletInfoMissing(Topodata.Tablet tablet) { + return StringUtils.isEmpty(tablet.getKeyspace()) || StringUtils.isEmpty(tablet.getShard()) + || Objects.equals(Topodata.TabletType.UNRECOGNIZED, tablet.getType()) + || StringUtils.isEmpty(tablet.getHostname()) || StringUtils.isEmpty(tablet.getMysqlHostname()) + || tablet.getMysqlPort() == 0 || Objects.equals(Topodata.TabletAlias.getDefaultInstance(), tablet.getAlias()) + || tablet.getPortMapMap().get("grpc") == null || tablet.getPortMapMap().get("grpc") == 0; + } } diff --git a/src/main/java/com/jd/jdbc/pool/HikariUtil.java b/src/main/java/com/jd/jdbc/pool/HikariUtil.java index 32a71e3..152bf2c 100644 --- a/src/main/java/com/jd/jdbc/pool/HikariUtil.java +++ b/src/main/java/com/jd/jdbc/pool/HikariUtil.java @@ -81,7 +81,7 @@ public static HikariConfig getHikariConfig(Topodata.Tablet tablet, String user, hikariConfig.setScheduledExecutor(HOUSEKEEPER_EXECUTOR); hikariConfig.setMetricsTrackerFactory(METRICS_TRACKER_FACTORY); if (logger.isDebugEnabled()) { - logger.debug("hikariConfig:nativeUrl=" + nativeUrl + " poolName=" + TopoProto.getPoolName(tablet) + " schema=" + realSchema); + logger.debug("hikariConfig:nativeUrl=" + nativeUrl + " poolName=" + TopoProto.getPoolName(tablet)); } return hikariConfig; } diff --git a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java index 3d272ff..02f6980 100644 --- a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java +++ b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java @@ -63,6 +63,8 @@ public class HealthCheckTest extends TestSuite { private final Map portMap = new HashMap<>(); + private int defaultMysqlPort = 3358; + @Rule public GrpcCleanupRule grpcCleanup; @@ -434,7 +436,7 @@ public void testHealthCheckOnNextAfterRemove() throws IOException, InterruptedEx } @Test - public void teestHealthCheckTimeout() throws IOException, InterruptedException { + public void testHealthCheckTimeout() throws IOException, InterruptedException { printComment("7. HealthCheck Test when health check timeout"); printComment("a. Get Health"); HealthCheck hc = getHealthCheck(); @@ -765,6 +767,78 @@ public void testUnhealthyReplicaAsSecondsBehind() throws IOException, Interrupte printOk(); } + @Test + public void testMysqlPort0to3358() throws IOException, InterruptedException { + printComment("14. HealthCheck Test in Tablet MySQL port changed from 0 to 3358"); + printComment("a. Get Health"); + HealthCheck hc = getHealthCheck(); + + printComment("b. Add a no-serving replica Tablet (MysqlPort = 0)"); + defaultMysqlPort = 0; + MockTablet mockTablet = buildMockTablet("cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + hc.addTablet(mockTablet.getTablet()); + Thread.sleep(200); + Assert.assertEquals("Wrong Tablet data", 0, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); + + printComment("c. replace a no-serving replica Tablet (MysqlPort = 3358)"); + defaultMysqlPort = 3358; + Topodata.Tablet tablet = buildTablet("cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + hc.replaceTablet(mockTablet.getTablet(), tablet); + Thread.sleep(200); + Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); + + printComment("d. Modify the status of REPLICA Tablet to serving"); + sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 10); + Thread.sleep(200); + + Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); + List hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); + Assert.assertEquals("Wrong number of healthy replica tablets", 1, hcList.size()); + + closeQueryService(mockTablet); + printOk(); + } + + @Test + public void testMysqlPort3358to0() throws IOException, InterruptedException { + printComment("15. HealthCheck Test in Tablet MySQL port changed from 3358 to 0"); + printComment("a. Get Health"); + HealthCheck hc = getHealthCheck(); + + printComment("b. Add a no-serving Tablet(MysqlPort = 3358)"); + MockTablet mockTablet = buildMockTablet("cell", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + hc.addTablet(mockTablet.getTablet()); + + Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); + Thread.sleep(200); + + printComment("c. Modify the status of Tablet to serving"); + sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); + + Thread.sleep(200); + + Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); + List hcList = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); + Assert.assertEquals("Wrong number of healthy replica tablets", 1, hcList.size()); + + printComment("d. replace a no-serving replica Tablet (MysqlPort = 0)"); + defaultMysqlPort = 0; + Topodata.Tablet tablet = buildTablet("cell1", 0, "a", "k", "s", portMap, Topodata.TabletType.REPLICA); + hc.replaceTablet(mockTablet.getTablet(), tablet); + Thread.sleep(6000); + + Assert.assertEquals("Wrong Tablet data", 0, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); + + closeQueryService(mockTablet); + printOk(); + } + private void startWatchTopo(String keyspaceName, TopoServer topoServer, String... cells) { for (String cell : cells) { TopologyWatcherManager.INSTANCE.startWatch(globalContext, topoServer, cell, keyspaceName); @@ -818,7 +892,8 @@ private MockTablet buildMockTablet(String cell, Integer uid, String hostName, St private Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type) { Topodata.TabletAlias tabletAlias = Topodata.TabletAlias.newBuilder().setCell(cell).setUid(uid).build(); - Topodata.Tablet.Builder tabletBuilder = Topodata.Tablet.newBuilder().setHostname(hostName).setAlias(tabletAlias).setKeyspace(keyspaceName).setShard(shard).setType(type); + Topodata.Tablet.Builder tabletBuilder = Topodata.Tablet.newBuilder() + .setHostname(hostName).setAlias(tabletAlias).setKeyspace(keyspaceName).setShard(shard).setType(type).setMysqlHostname(hostName).setMysqlPort(defaultMysqlPort); for (Map.Entry portEntry : portMap.entrySet()) { tabletBuilder.putPortMap(portEntry.getKey(), portEntry.getValue()); } diff --git a/src/test/java/com/jd/jdbc/queryservice/MockQueryServer.java b/src/test/java/com/jd/jdbc/queryservice/MockQueryServer.java index 2d05d8a..4780323 100644 --- a/src/test/java/com/jd/jdbc/queryservice/MockQueryServer.java +++ b/src/test/java/com/jd/jdbc/queryservice/MockQueryServer.java @@ -97,7 +97,11 @@ private void notifyAll(HealthCheckMessage message) { break; default: observers.forEach(observer -> { - observer.onNext(message.getMessage()); + try { + observer.onNext(message.getMessage()); + } catch (Exception e) { + e.printStackTrace(); + } }); System.out.println("server: receive message: " + message.getMessage()); } diff --git a/src/test/java/com/jd/jdbc/vitess/VitessDriverConnectionPoolTest.java b/src/test/java/com/jd/jdbc/vitess/VitessDriverConnectionPoolTest.java index a716a17..099c503 100644 --- a/src/test/java/com/jd/jdbc/vitess/VitessDriverConnectionPoolTest.java +++ b/src/test/java/com/jd/jdbc/vitess/VitessDriverConnectionPoolTest.java @@ -86,11 +86,7 @@ public void init() { vtMaximumPoolSize = (random.nextInt(150) + 1) + ""; vtConnectionTimeout = (random.nextInt(50000) + 250) + ""; vtIdleTimeout = (random.nextInt(600000) + 10000) + ""; - - propsUrl = - "vtMinimumIdle=" + vtMinimumIdle + ";vtMaximumPoolSize=" + vtMaximumPoolSize + ";vtConnectionInitSql=select 1;vtConnectionTestQuery=select 1;vtConnectionTimeout=" + vtConnectionTimeout + - ";vtIdleTimeout=" + vtIdleTimeout; - + propsUrl = "vtMinimumIdle=" + vtMinimumIdle + ";vtMaximumPoolSize=" + vtMaximumPoolSize + ";vtConnectionInitSql=select 1;vtConnectionTestQuery=select 1;vtConnectionTimeout=" + vtConnectionTimeout + ";vtIdleTimeout=" + vtIdleTimeout; } public void test0() throws NoSuchFieldException, IllegalAccessException { @@ -108,7 +104,6 @@ public void test0() throws NoSuchFieldException, IllegalAccessException { Assert.assertEquals("select 1", hikariConfig.getConnectionTestQuery() + ""); Assert.assertEquals(vtConnectionTimeout, hikariConfig.getConnectionTimeout() + ""); Assert.assertEquals(vtIdleTimeout, hikariConfig.getIdleTimeout() + ""); - } public void testInit() { @@ -143,7 +138,6 @@ public void testHikari() throws Exception { conn.close(); } - @Test public void testDruid1() throws Exception { print(); @@ -183,7 +177,6 @@ public void testDruid2() throws Exception { connection.close(); } - @Test public void testdbcp() throws Exception { print(); @@ -201,7 +194,6 @@ public void testdbcp() throws Exception { connection.close(); } - @Test public void testc3p0() throws Exception { print(); @@ -256,8 +248,7 @@ public void testDefaultConnectionPoolSize() throws NoSuchFieldException, Illegal printNormal("testDefaultConnectionPoolSize>>>>>"); } - public void checkInnerPoolSize(String url, Topodata.TabletType type, int expectedMin, int expectedMax) - throws SQLException, NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException, InterruptedException { + public void checkInnerPoolSize(String url, Topodata.TabletType type, int expectedMin, int expectedMax) throws SQLException, NoSuchFieldException, IllegalAccessException { printInfo("url: " + url); Connection conn = DriverManager.getConnection(url);