From 1a087a8ac23154dfb432cad741d01a32df1f6aeb Mon Sep 17 00:00:00 2001 From: wangweicugw Date: Fri, 28 Jul 2023 15:05:02 +0800 Subject: [PATCH] Fix bug caused by IP reuse --- .../com/jd/jdbc/discovery/HealthCheck.java | 8 +- .../jd/jdbc/queryservice/TabletDialer.java | 19 ++-- .../jd/jdbc/discovery/HealthCheckTest.java | 87 +++++++++++++++++++ .../com/jd/jdbc/discovery/MockTablet.java | 52 ++--------- .../jd/jdbc/topo/etcd2topo/ServerTest.java | 11 +-- 5 files changed, 116 insertions(+), 61 deletions(-) diff --git a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java index 7c70eb0..016a849 100644 --- a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java +++ b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java @@ -374,8 +374,7 @@ public void replaceTablet(Topodata.Tablet oldTablet, Topodata.Tablet newTablet) this.addTablet(newTablet); } - public void updateHealth(final TabletHealthCheck th, final Query.Target preTarget, - final boolean trivialUpdate, boolean up) { + public void updateHealth(final TabletHealthCheck th, final Query.Target preTarget, final boolean trivialUpdate, boolean up) { String tabletAlias = TopoProto.tabletAliasString(th.getTablet().getAlias()); String targetKey = keyFromTarget(th.getTarget()); boolean targetChanged = preTarget.getTabletType() != th.getTarget().getTabletType() @@ -388,11 +387,16 @@ public void updateHealth(final TabletHealthCheck th, final Query.Target preTarge return; } if (targetChanged) { + // keyspace and shard are not expected to change, but just in case ... + // move this tabletHealthCheck to the correct map String oldTargetKey = keyFromTarget(preTarget); this.healthData.get(oldTargetKey).remove(tabletAlias); MapUtil.computeIfAbsent(healthData, targetKey, key -> new HashMap<>()); } + // add it to the map by target and create the map record if needed + MapUtil.computeIfAbsent(healthData, targetKey, key -> new HashMap<>()); this.healthData.get(targetKey).put(tabletAlias, th); + boolean isPrimary = th.getTarget().getTabletType() == Topodata.TabletType.MASTER; if (isPrimary) { if (up) { diff --git a/src/main/java/com/jd/jdbc/queryservice/TabletDialer.java b/src/main/java/com/jd/jdbc/queryservice/TabletDialer.java index e584e09..ee87f31 100644 --- a/src/main/java/com/jd/jdbc/queryservice/TabletDialer.java +++ b/src/main/java/com/jd/jdbc/queryservice/TabletDialer.java @@ -18,6 +18,7 @@ package com.jd.jdbc.queryservice; +import com.jd.jdbc.topo.topoproto.TopoProto; import com.jd.jdbc.util.threadpool.JdkUtil; import com.jd.jdbc.util.threadpool.VtThreadFactoryBuilder; import com.jd.jdbc.util.threadpool.impl.TabletNettyExecutorService; @@ -40,11 +41,13 @@ public class TabletDialer { private static final Map TABLET_QUERY_SERVICE_CACHE = new ConcurrentHashMap<>(128 + 1); public static IParentQueryService dial(final Topodata.Tablet tablet) { - final String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc"); - if (TABLET_QUERY_SERVICE_CACHE.containsKey(addr)) { - return TABLET_QUERY_SERVICE_CACHE.get(addr); + final String aliasString = TopoProto.tabletAliasString(tablet.getAlias()); + IParentQueryService queryService = TABLET_QUERY_SERVICE_CACHE.get(aliasString); + if (queryService != null) { + return queryService; } + String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc"); ManagedChannel channel = NettyChannelBuilder.forTarget(addr).usePlaintext() .offloadExecutor(TabletNettyExecutorService.getNettyExecutorService()) .executor(TabletNettyExecutorService.getNettyExecutorService()) @@ -53,18 +56,18 @@ public static IParentQueryService dial(final Topodata.Tablet tablet) { .keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build(); IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet); - TABLET_QUERY_SERVICE_CACHE.putIfAbsent(addr, combinedQueryService); + TABLET_QUERY_SERVICE_CACHE.putIfAbsent(aliasString, combinedQueryService); return combinedQueryService; } public static void close(final Topodata.Tablet tablet) { - final String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc"); - TABLET_QUERY_SERVICE_CACHE.remove(addr); + final String aliasString = TopoProto.tabletAliasString(tablet.getAlias()); + TABLET_QUERY_SERVICE_CACHE.remove(aliasString); } protected static void registerTabletCache(final Topodata.Tablet tablet, final IParentQueryService combinedQueryService) { - final String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc"); - TABLET_QUERY_SERVICE_CACHE.putIfAbsent(addr, combinedQueryService); + final String aliasString = TopoProto.tabletAliasString(tablet.getAlias()); + TABLET_QUERY_SERVICE_CACHE.putIfAbsent(aliasString, combinedQueryService); } protected static void clearTabletCache() { diff --git a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java index 3ff1836..0bb71ec 100644 --- a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java +++ b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java @@ -18,6 +18,7 @@ package com.jd.jdbc.discovery; +import com.google.common.collect.Lists; import com.jd.jdbc.common.util.CollectionUtils; import com.jd.jdbc.context.IContext; import com.jd.jdbc.context.VtContext; @@ -44,6 +45,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; +import lombok.AllArgsConstructor; +import lombok.ToString; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -1038,6 +1041,90 @@ public void testHealthyConcurrentModificationException() { MockTablet.closeQueryService(mockTablet0, mockTablet1, mockTablet2); } + @Test + public void ipReUse() { + @AllArgsConstructor + @ToString + class TestCase { + final Topodata.TabletType oldType; + + final Topodata.TabletType newType; + } + TestCase testCase1 = new TestCase(Topodata.TabletType.REPLICA, Topodata.TabletType.MASTER); + TestCase testCase2 = new TestCase(Topodata.TabletType.REPLICA, Topodata.TabletType.REPLICA); + TestCase testCase3 = new TestCase(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY); + TestCase testCase4 = new TestCase(Topodata.TabletType.MASTER, Topodata.TabletType.MASTER); + TestCase testCase5 = new TestCase(Topodata.TabletType.MASTER, Topodata.TabletType.REPLICA); + TestCase testCase6 = new TestCase(Topodata.TabletType.MASTER, Topodata.TabletType.RDONLY); + TestCase testCase7 = new TestCase(Topodata.TabletType.RDONLY, Topodata.TabletType.MASTER); + TestCase testCase8 = new TestCase(Topodata.TabletType.RDONLY, Topodata.TabletType.REPLICA); + TestCase testCase9 = new TestCase(Topodata.TabletType.RDONLY, Topodata.TabletType.RDONLY); + ArrayList testCases = Lists.newArrayList(testCase1, testCase2, testCase3, testCase4, testCase5, testCase6, testCase7, testCase8, testCase9); + + for (TestCase testCase : testCases) { + printComment("HealthCheck Test when Tablet ip reuse"); + HealthCheck hc = getHealthCheck(); + + printComment("1. Add a no-serving Tablet"); + String ip = "127.0.0.1"; + MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, ip, "k", "s", portMap, testCase.oldType); + hc.addTablet(mockTablet.getTablet()); + + Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 0, getHealthySize(hc.getHealthyCopy())); + sleepMillisSeconds(200); + + printComment("2. Modify the status of Tablet to serving"); + sendOnNextMessage(mockTablet, testCase.oldType, true, 0, 0.5, 0); + + sleepMillisSeconds(200); + + Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 1, getHealthySize(hc.getHealthyCopy())); + + printComment("3. Modify the status of Tablet to no serving"); + sendOnNextMessage(mockTablet, testCase.oldType, false, 0, 0.5, 0); + + sleepMillisSeconds(200); + + printComment("4. Add another no-serving new Tablet"); + MockTablet mockTablet2 = MockTablet.buildMockTablet(grpcCleanup, "cell", 1, ip, "k2", "s", portMap, testCase.newType); + hc.addTablet(mockTablet2.getTablet()); + + Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 0, getHealthySize(hc.getHealthyCopy())); + sleepMillisSeconds(200); + + printComment("5. Modify the status of new Tablet to serving"); + sendOnNextMessage(mockTablet2, testCase.newType, true, 0, 0.5, 1); + + sleepMillisSeconds(200); + + Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 1, getHealthySize(hc.getHealthyCopy())); + + printComment("4. removeTablet old Tablet"); + hc.removeTablet(mockTablet.getTablet()); + + sleepMillisSeconds(2000); + + Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 1, getHealthySize(hc.getHealthyCopy())); + + MockTablet.closeQueryService(mockTablet, mockTablet2); + printOk("test ipReUse success,testCase :" + testCase); + HealthCheck.resetHealthCheck(); + } + } + + private int getHealthySize(Map> map) { + int size = 0; + for (Map.Entry> entry : map.entrySet()) { + size += entry.getValue().size(); + } + return size; + } + private List mockGetHealthyTabletStats(Map> healthy, Query.Target target) { List list = healthy.get(HealthCheck.keyFromTarget(target)); if (null == list || list.isEmpty()) { diff --git a/src/test/java/com/jd/jdbc/discovery/MockTablet.java b/src/test/java/com/jd/jdbc/discovery/MockTablet.java index 1dc4e39..8dd8b92 100644 --- a/src/test/java/com/jd/jdbc/discovery/MockTablet.java +++ b/src/test/java/com/jd/jdbc/discovery/MockTablet.java @@ -81,57 +81,17 @@ public static MockTablet buildMockTablet(Topodata.Tablet tablet, GrpcCleanupRule public static MockTablet buildMockTablet(GrpcCleanupRule grpcCleanup, String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type) { - String serverName = InProcessServerBuilder.generateName(); - BlockingQueue healthMessage = new ArrayBlockingQueue<>(2); - MockQueryServer queryServer = new MockQueryServer(healthMessage); - Server server = null; - try { - server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - grpcCleanup.register(server); - - ManagedChannel channel = - InProcessChannelBuilder.forName(serverName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build(); - grpcCleanup.register(channel); - - Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type, 3358); - IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet); - TabletDialerAgent.registerTabletCache(tablet, combinedQueryService); - - return new MockTablet(tablet, healthMessage, queryServer, server, channel); + return buildMockTablet(grpcCleanup, cell, uid, hostName, keyspaceName, shard, portMap, type, 3358); } public static MockTablet buildMockTablet(GrpcCleanupRule grpcCleanup, String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, - Topodata.TabletType type, - int defaultMysqlPort) { - String serverName = InProcessServerBuilder.generateName(); - BlockingQueue healthMessage = new ArrayBlockingQueue<>(2); - MockQueryServer queryServer = new MockQueryServer(healthMessage); - Server server = null; - try { - server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start(); - } catch (IOException e) { - throw new RuntimeException(e); - } - - grpcCleanup.register(server); - - ManagedChannel channel = - InProcessChannelBuilder.forName(serverName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build(); - grpcCleanup.register(channel); - + Topodata.TabletType type, int defaultMysqlPort) { Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type, defaultMysqlPort); - IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet); - TabletDialerAgent.registerTabletCache(tablet, combinedQueryService); - - return new MockTablet(tablet, healthMessage, queryServer, server, channel); + return buildMockTablet(tablet, grpcCleanup); } - public static Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type, - int defaultMysqlPort) { + private static Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map portMap, Topodata.TabletType type, + int defaultMysqlPort) { Topodata.TabletAlias tabletAlias = Topodata.TabletAlias.newBuilder().setCell(cell).setUid(uid).build(); Topodata.Tablet.Builder tabletBuilder = Topodata.Tablet.newBuilder() .setHostname(hostName).setAlias(tabletAlias).setKeyspace(keyspaceName).setShard(shard).setType(type).setMysqlHostname(hostName).setMysqlPort(defaultMysqlPort); @@ -141,7 +101,7 @@ public static Topodata.Tablet buildTablet(String cell, Integer uid, String hostN return tabletBuilder.build(); } - public static void closeQueryService(MockTablet... tablets) { + public static void closeQueryService(MockTablet... tablets) { MockQueryServer.HealthCheckMessage close = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Close, null); for (MockTablet tablet : tablets) { try { diff --git a/src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java b/src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java index b70fe57..e780201 100644 --- a/src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java +++ b/src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -76,7 +77,7 @@ public class ServerTest extends TestSuite { //The ip address needs to be modified to the ip address of the machine where etcd is located public static String TOPO_GLOBAL_SERVER_ADDRESS = "http://127.0.0.1:2379"; - private static ExecutorService executorService = getThreadPool(10, 10); + private static final ExecutorService executorService = getThreadPool(10, 10); @AfterClass public static void afterClass() { @@ -109,7 +110,7 @@ public static void registerFactory() { } @Test - public void case01_testClient() throws ExecutionException, InterruptedException { + public void case01_testClient() throws ExecutionException, InterruptedException, TimeoutException { Client client = Client.builder().endpoints(TOPO_GLOBAL_PROXY_ADDRESS).build(); Assert.assertNotNull(client); @@ -119,7 +120,7 @@ public void case01_testClient() throws ExecutionException, InterruptedException .withPrefix(ByteSequence.from(sequence.getBytes())) .build(); CompletableFuture future = client.getKVClient().get(sequence, option); - GetResponse resp = future.get(); + GetResponse resp = future.get(10, TimeUnit.SECONDS); for (KeyValue kv : resp.getKvs()) { String key = kv.getKey().toString(Charset.defaultCharset()); Assert.assertNotNull(key); @@ -248,7 +249,7 @@ private void commonTestServer(TopoServer topoServer) throws TopoException { } @Test - public void case09_getTabletsByRange() throws ExecutionException, InterruptedException { + public void case09_getTabletsByRange() throws ExecutionException, InterruptedException, TimeoutException { Client client = Client.builder() .connectTimeout(Duration.ofSeconds(5)) @@ -264,7 +265,7 @@ public void case09_getTabletsByRange() throws ExecutionException, InterruptedExc .withRange(endSequence) .build(); CompletableFuture future = client.getKVClient().get(startSequence, option); - GetResponse resp = future.get(); + GetResponse resp = future.get(10, TimeUnit.SECONDS); for (KeyValue kv : resp.getKvs()) { String key = kv.getKey().toString(Charset.defaultCharset()); Assert.assertNotNull(key);