Skip to content

Commit c0c1286

Browse files
authored
Fix bug caused by IP reuse (#137)
1 parent a205ef7 commit c0c1286

File tree

5 files changed

+116
-61
lines changed

5 files changed

+116
-61
lines changed

src/main/java/com/jd/jdbc/discovery/HealthCheck.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -374,8 +374,7 @@ public void replaceTablet(Topodata.Tablet oldTablet, Topodata.Tablet newTablet)
374374
this.addTablet(newTablet);
375375
}
376376

377-
public void updateHealth(final TabletHealthCheck th, final Query.Target preTarget,
378-
final boolean trivialUpdate, boolean up) {
377+
public void updateHealth(final TabletHealthCheck th, final Query.Target preTarget, final boolean trivialUpdate, boolean up) {
379378
String tabletAlias = TopoProto.tabletAliasString(th.getTablet().getAlias());
380379
String targetKey = keyFromTarget(th.getTarget());
381380
boolean targetChanged = preTarget.getTabletType() != th.getTarget().getTabletType()
@@ -388,11 +387,16 @@ public void updateHealth(final TabletHealthCheck th, final Query.Target preTarge
388387
return;
389388
}
390389
if (targetChanged) {
390+
// keyspace and shard are not expected to change, but just in case ...
391+
// move this tabletHealthCheck to the correct map
391392
String oldTargetKey = keyFromTarget(preTarget);
392393
this.healthData.get(oldTargetKey).remove(tabletAlias);
393394
MapUtil.computeIfAbsent(healthData, targetKey, key -> new HashMap<>());
394395
}
396+
// add it to the map by target and create the map record if needed
397+
MapUtil.computeIfAbsent(healthData, targetKey, key -> new HashMap<>());
395398
this.healthData.get(targetKey).put(tabletAlias, th);
399+
396400
boolean isPrimary = th.getTarget().getTabletType() == Topodata.TabletType.MASTER;
397401
if (isPrimary) {
398402
if (up) {

src/main/java/com/jd/jdbc/queryservice/TabletDialer.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.jd.jdbc.queryservice;
2020

21+
import com.jd.jdbc.topo.topoproto.TopoProto;
2122
import com.jd.jdbc.util.threadpool.JdkUtil;
2223
import com.jd.jdbc.util.threadpool.VtThreadFactoryBuilder;
2324
import com.jd.jdbc.util.threadpool.impl.TabletNettyExecutorService;
@@ -40,11 +41,13 @@ public class TabletDialer {
4041
private static final Map<String, IParentQueryService> TABLET_QUERY_SERVICE_CACHE = new ConcurrentHashMap<>(128 + 1);
4142

4243
public static IParentQueryService dial(final Topodata.Tablet tablet) {
43-
final String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc");
44-
if (TABLET_QUERY_SERVICE_CACHE.containsKey(addr)) {
45-
return TABLET_QUERY_SERVICE_CACHE.get(addr);
44+
final String aliasString = TopoProto.tabletAliasString(tablet.getAlias());
45+
IParentQueryService queryService = TABLET_QUERY_SERVICE_CACHE.get(aliasString);
46+
if (queryService != null) {
47+
return queryService;
4648
}
4749

50+
String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc");
4851
ManagedChannel channel = NettyChannelBuilder.forTarget(addr).usePlaintext()
4952
.offloadExecutor(TabletNettyExecutorService.getNettyExecutorService())
5053
.executor(TabletNettyExecutorService.getNettyExecutorService())
@@ -53,18 +56,18 @@ public static IParentQueryService dial(final Topodata.Tablet tablet) {
5356
.keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build();
5457

5558
IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet);
56-
TABLET_QUERY_SERVICE_CACHE.putIfAbsent(addr, combinedQueryService);
59+
TABLET_QUERY_SERVICE_CACHE.putIfAbsent(aliasString, combinedQueryService);
5760
return combinedQueryService;
5861
}
5962

6063
public static void close(final Topodata.Tablet tablet) {
61-
final String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc");
62-
TABLET_QUERY_SERVICE_CACHE.remove(addr);
64+
final String aliasString = TopoProto.tabletAliasString(tablet.getAlias());
65+
TABLET_QUERY_SERVICE_CACHE.remove(aliasString);
6366
}
6467

6568
protected static void registerTabletCache(final Topodata.Tablet tablet, final IParentQueryService combinedQueryService) {
66-
final String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc");
67-
TABLET_QUERY_SERVICE_CACHE.putIfAbsent(addr, combinedQueryService);
69+
final String aliasString = TopoProto.tabletAliasString(tablet.getAlias());
70+
TABLET_QUERY_SERVICE_CACHE.putIfAbsent(aliasString, combinedQueryService);
6871
}
6972

7073
protected static void clearTabletCache() {

src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package com.jd.jdbc.discovery;
2020

21+
import com.google.common.collect.Lists;
2122
import com.jd.jdbc.common.util.CollectionUtils;
2223
import com.jd.jdbc.context.IContext;
2324
import com.jd.jdbc.context.VtContext;
@@ -44,6 +45,8 @@
4445
import java.util.concurrent.ConcurrentHashMap;
4546
import java.util.concurrent.ExecutorService;
4647
import java.util.concurrent.atomic.AtomicBoolean;
48+
import lombok.AllArgsConstructor;
49+
import lombok.ToString;
4750
import org.junit.After;
4851
import org.junit.AfterClass;
4952
import org.junit.Assert;
@@ -1038,6 +1041,90 @@ public void testHealthyConcurrentModificationException() {
10381041
MockTablet.closeQueryService(mockTablet0, mockTablet1, mockTablet2);
10391042
}
10401043

1044+
@Test
1045+
public void ipReUse() {
1046+
@AllArgsConstructor
1047+
@ToString
1048+
class TestCase {
1049+
final Topodata.TabletType oldType;
1050+
1051+
final Topodata.TabletType newType;
1052+
}
1053+
TestCase testCase1 = new TestCase(Topodata.TabletType.REPLICA, Topodata.TabletType.MASTER);
1054+
TestCase testCase2 = new TestCase(Topodata.TabletType.REPLICA, Topodata.TabletType.REPLICA);
1055+
TestCase testCase3 = new TestCase(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY);
1056+
TestCase testCase4 = new TestCase(Topodata.TabletType.MASTER, Topodata.TabletType.MASTER);
1057+
TestCase testCase5 = new TestCase(Topodata.TabletType.MASTER, Topodata.TabletType.REPLICA);
1058+
TestCase testCase6 = new TestCase(Topodata.TabletType.MASTER, Topodata.TabletType.RDONLY);
1059+
TestCase testCase7 = new TestCase(Topodata.TabletType.RDONLY, Topodata.TabletType.MASTER);
1060+
TestCase testCase8 = new TestCase(Topodata.TabletType.RDONLY, Topodata.TabletType.REPLICA);
1061+
TestCase testCase9 = new TestCase(Topodata.TabletType.RDONLY, Topodata.TabletType.RDONLY);
1062+
ArrayList<TestCase> testCases = Lists.newArrayList(testCase1, testCase2, testCase3, testCase4, testCase5, testCase6, testCase7, testCase8, testCase9);
1063+
1064+
for (TestCase testCase : testCases) {
1065+
printComment("HealthCheck Test when Tablet ip reuse");
1066+
HealthCheck hc = getHealthCheck();
1067+
1068+
printComment("1. Add a no-serving Tablet");
1069+
String ip = "127.0.0.1";
1070+
MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, ip, "k", "s", portMap, testCase.oldType);
1071+
hc.addTablet(mockTablet.getTablet());
1072+
1073+
Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size());
1074+
Assert.assertEquals("Wrong Healthy Tablet data", 0, getHealthySize(hc.getHealthyCopy()));
1075+
sleepMillisSeconds(200);
1076+
1077+
printComment("2. Modify the status of Tablet to serving");
1078+
sendOnNextMessage(mockTablet, testCase.oldType, true, 0, 0.5, 0);
1079+
1080+
sleepMillisSeconds(200);
1081+
1082+
Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size());
1083+
Assert.assertEquals("Wrong Healthy Tablet data", 1, getHealthySize(hc.getHealthyCopy()));
1084+
1085+
printComment("3. Modify the status of Tablet to no serving");
1086+
sendOnNextMessage(mockTablet, testCase.oldType, false, 0, 0.5, 0);
1087+
1088+
sleepMillisSeconds(200);
1089+
1090+
printComment("4. Add another no-serving new Tablet");
1091+
MockTablet mockTablet2 = MockTablet.buildMockTablet(grpcCleanup, "cell", 1, ip, "k2", "s", portMap, testCase.newType);
1092+
hc.addTablet(mockTablet2.getTablet());
1093+
1094+
Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size());
1095+
Assert.assertEquals("Wrong Healthy Tablet data", 0, getHealthySize(hc.getHealthyCopy()));
1096+
sleepMillisSeconds(200);
1097+
1098+
printComment("5. Modify the status of new Tablet to serving");
1099+
sendOnNextMessage(mockTablet2, testCase.newType, true, 0, 0.5, 1);
1100+
1101+
sleepMillisSeconds(200);
1102+
1103+
Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size());
1104+
Assert.assertEquals("Wrong Healthy Tablet data", 1, getHealthySize(hc.getHealthyCopy()));
1105+
1106+
printComment("4. removeTablet old Tablet");
1107+
hc.removeTablet(mockTablet.getTablet());
1108+
1109+
sleepMillisSeconds(2000);
1110+
1111+
Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size());
1112+
Assert.assertEquals("Wrong Healthy Tablet data", 1, getHealthySize(hc.getHealthyCopy()));
1113+
1114+
MockTablet.closeQueryService(mockTablet, mockTablet2);
1115+
printOk("test ipReUse success,testCase :" + testCase);
1116+
HealthCheck.resetHealthCheck();
1117+
}
1118+
}
1119+
1120+
private int getHealthySize(Map<String, List<TabletHealthCheck>> map) {
1121+
int size = 0;
1122+
for (Map.Entry<String, List<TabletHealthCheck>> entry : map.entrySet()) {
1123+
size += entry.getValue().size();
1124+
}
1125+
return size;
1126+
}
1127+
10411128
private List<TabletHealthCheck> mockGetHealthyTabletStats(Map<String, List<TabletHealthCheck>> healthy, Query.Target target) {
10421129
List<TabletHealthCheck> list = healthy.get(HealthCheck.keyFromTarget(target));
10431130
if (null == list || list.isEmpty()) {

src/test/java/com/jd/jdbc/discovery/MockTablet.java

Lines changed: 6 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -81,57 +81,17 @@ public static MockTablet buildMockTablet(Topodata.Tablet tablet, GrpcCleanupRule
8181

8282
public static MockTablet buildMockTablet(GrpcCleanupRule grpcCleanup, String cell, Integer uid, String hostName, String keyspaceName, String shard, Map<String, Integer> portMap,
8383
Topodata.TabletType type) {
84-
String serverName = InProcessServerBuilder.generateName();
85-
BlockingQueue<MockQueryServer.HealthCheckMessage> healthMessage = new ArrayBlockingQueue<>(2);
86-
MockQueryServer queryServer = new MockQueryServer(healthMessage);
87-
Server server = null;
88-
try {
89-
server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start();
90-
} catch (IOException e) {
91-
throw new RuntimeException(e);
92-
}
93-
94-
grpcCleanup.register(server);
95-
96-
ManagedChannel channel =
97-
InProcessChannelBuilder.forName(serverName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build();
98-
grpcCleanup.register(channel);
99-
100-
Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type, 3358);
101-
IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet);
102-
TabletDialerAgent.registerTabletCache(tablet, combinedQueryService);
103-
104-
return new MockTablet(tablet, healthMessage, queryServer, server, channel);
84+
return buildMockTablet(grpcCleanup, cell, uid, hostName, keyspaceName, shard, portMap, type, 3358);
10585
}
10686

10787
public static MockTablet buildMockTablet(GrpcCleanupRule grpcCleanup, String cell, Integer uid, String hostName, String keyspaceName, String shard, Map<String, Integer> portMap,
108-
Topodata.TabletType type,
109-
int defaultMysqlPort) {
110-
String serverName = InProcessServerBuilder.generateName();
111-
BlockingQueue<MockQueryServer.HealthCheckMessage> healthMessage = new ArrayBlockingQueue<>(2);
112-
MockQueryServer queryServer = new MockQueryServer(healthMessage);
113-
Server server = null;
114-
try {
115-
server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start();
116-
} catch (IOException e) {
117-
throw new RuntimeException(e);
118-
}
119-
120-
grpcCleanup.register(server);
121-
122-
ManagedChannel channel =
123-
InProcessChannelBuilder.forName(serverName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build();
124-
grpcCleanup.register(channel);
125-
88+
Topodata.TabletType type, int defaultMysqlPort) {
12689
Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type, defaultMysqlPort);
127-
IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet);
128-
TabletDialerAgent.registerTabletCache(tablet, combinedQueryService);
129-
130-
return new MockTablet(tablet, healthMessage, queryServer, server, channel);
90+
return buildMockTablet(tablet, grpcCleanup);
13191
}
13292

133-
public static Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map<String, Integer> portMap, Topodata.TabletType type,
134-
int defaultMysqlPort) {
93+
private static Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map<String, Integer> portMap, Topodata.TabletType type,
94+
int defaultMysqlPort) {
13595
Topodata.TabletAlias tabletAlias = Topodata.TabletAlias.newBuilder().setCell(cell).setUid(uid).build();
13696
Topodata.Tablet.Builder tabletBuilder = Topodata.Tablet.newBuilder()
13797
.setHostname(hostName).setAlias(tabletAlias).setKeyspace(keyspaceName).setShard(shard).setType(type).setMysqlHostname(hostName).setMysqlPort(defaultMysqlPort);
@@ -141,7 +101,7 @@ public static Topodata.Tablet buildTablet(String cell, Integer uid, String hostN
141101
return tabletBuilder.build();
142102
}
143103

144-
public static void closeQueryService(MockTablet... tablets) {
104+
public static void closeQueryService(MockTablet... tablets) {
145105
MockQueryServer.HealthCheckMessage close = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Close, null);
146106
for (MockTablet tablet : tablets) {
147107
try {

src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.concurrent.ExecutionException;
4646
import java.util.concurrent.ExecutorService;
4747
import java.util.concurrent.TimeUnit;
48+
import java.util.concurrent.TimeoutException;
4849
import org.junit.AfterClass;
4950
import org.junit.Assert;
5051
import org.junit.BeforeClass;
@@ -76,7 +77,7 @@ public class ServerTest extends TestSuite {
7677
//The ip address needs to be modified to the ip address of the machine where etcd is located
7778
public static String TOPO_GLOBAL_SERVER_ADDRESS = "http://127.0.0.1:2379";
7879

79-
private static ExecutorService executorService = getThreadPool(10, 10);
80+
private static final ExecutorService executorService = getThreadPool(10, 10);
8081

8182
@AfterClass
8283
public static void afterClass() {
@@ -109,7 +110,7 @@ public static void registerFactory() {
109110
}
110111

111112
@Test
112-
public void case01_testClient() throws ExecutionException, InterruptedException {
113+
public void case01_testClient() throws ExecutionException, InterruptedException, TimeoutException {
113114
Client client = Client.builder().endpoints(TOPO_GLOBAL_PROXY_ADDRESS).build();
114115
Assert.assertNotNull(client);
115116

@@ -119,7 +120,7 @@ public void case01_testClient() throws ExecutionException, InterruptedException
119120
.withPrefix(ByteSequence.from(sequence.getBytes()))
120121
.build();
121122
CompletableFuture<GetResponse> future = client.getKVClient().get(sequence, option);
122-
GetResponse resp = future.get();
123+
GetResponse resp = future.get(10, TimeUnit.SECONDS);
123124
for (KeyValue kv : resp.getKvs()) {
124125
String key = kv.getKey().toString(Charset.defaultCharset());
125126
Assert.assertNotNull(key);
@@ -248,7 +249,7 @@ private void commonTestServer(TopoServer topoServer) throws TopoException {
248249
}
249250

250251
@Test
251-
public void case09_getTabletsByRange() throws ExecutionException, InterruptedException {
252+
public void case09_getTabletsByRange() throws ExecutionException, InterruptedException, TimeoutException {
252253

253254
Client client = Client.builder()
254255
.connectTimeout(Duration.ofSeconds(5))
@@ -264,7 +265,7 @@ public void case09_getTabletsByRange() throws ExecutionException, InterruptedExc
264265
.withRange(endSequence)
265266
.build();
266267
CompletableFuture<GetResponse> future = client.getKVClient().get(startSequence, option);
267-
GetResponse resp = future.get();
268+
GetResponse resp = future.get(10, TimeUnit.SECONDS);
268269
for (KeyValue kv : resp.getKvs()) {
269270
String key = kv.getKey().toString(Charset.defaultCharset());
270271
Assert.assertNotNull(key);

0 commit comments

Comments
 (0)