Skip to content

Commit

Permalink
Fix bug caused by IP reuse
Browse files Browse the repository at this point in the history
  • Loading branch information
wangweicugw committed Sep 15, 2023
1 parent a205ef7 commit 1a087a8
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 61 deletions.
8 changes: 6 additions & 2 deletions src/main/java/com/jd/jdbc/discovery/HealthCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ public void replaceTablet(Topodata.Tablet oldTablet, Topodata.Tablet newTablet)
this.addTablet(newTablet);
}

public void updateHealth(final TabletHealthCheck th, final Query.Target preTarget,
final boolean trivialUpdate, boolean up) {
public void updateHealth(final TabletHealthCheck th, final Query.Target preTarget, final boolean trivialUpdate, boolean up) {
String tabletAlias = TopoProto.tabletAliasString(th.getTablet().getAlias());
String targetKey = keyFromTarget(th.getTarget());
boolean targetChanged = preTarget.getTabletType() != th.getTarget().getTabletType()
Expand All @@ -388,11 +387,16 @@ public void updateHealth(final TabletHealthCheck th, final Query.Target preTarge
return;
}
if (targetChanged) {
// keyspace and shard are not expected to change, but just in case ...
// move this tabletHealthCheck to the correct map
String oldTargetKey = keyFromTarget(preTarget);
this.healthData.get(oldTargetKey).remove(tabletAlias);
MapUtil.computeIfAbsent(healthData, targetKey, key -> new HashMap<>());
}
// add it to the map by target and create the map record if needed
MapUtil.computeIfAbsent(healthData, targetKey, key -> new HashMap<>());
this.healthData.get(targetKey).put(tabletAlias, th);

boolean isPrimary = th.getTarget().getTabletType() == Topodata.TabletType.MASTER;
if (isPrimary) {
if (up) {
Expand Down
19 changes: 11 additions & 8 deletions src/main/java/com/jd/jdbc/queryservice/TabletDialer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.jd.jdbc.queryservice;

import com.jd.jdbc.topo.topoproto.TopoProto;
import com.jd.jdbc.util.threadpool.JdkUtil;
import com.jd.jdbc.util.threadpool.VtThreadFactoryBuilder;
import com.jd.jdbc.util.threadpool.impl.TabletNettyExecutorService;
Expand All @@ -40,11 +41,13 @@ public class TabletDialer {
private static final Map<String, IParentQueryService> TABLET_QUERY_SERVICE_CACHE = new ConcurrentHashMap<>(128 + 1);

public static IParentQueryService dial(final Topodata.Tablet tablet) {
final String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc");
if (TABLET_QUERY_SERVICE_CACHE.containsKey(addr)) {
return TABLET_QUERY_SERVICE_CACHE.get(addr);
final String aliasString = TopoProto.tabletAliasString(tablet.getAlias());
IParentQueryService queryService = TABLET_QUERY_SERVICE_CACHE.get(aliasString);
if (queryService != null) {
return queryService;
}

String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc");
ManagedChannel channel = NettyChannelBuilder.forTarget(addr).usePlaintext()
.offloadExecutor(TabletNettyExecutorService.getNettyExecutorService())
.executor(TabletNettyExecutorService.getNettyExecutorService())
Expand All @@ -53,18 +56,18 @@ public static IParentQueryService dial(final Topodata.Tablet tablet) {
.keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build();

IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet);
TABLET_QUERY_SERVICE_CACHE.putIfAbsent(addr, combinedQueryService);
TABLET_QUERY_SERVICE_CACHE.putIfAbsent(aliasString, combinedQueryService);
return combinedQueryService;
}

public static void close(final Topodata.Tablet tablet) {
final String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc");
TABLET_QUERY_SERVICE_CACHE.remove(addr);
final String aliasString = TopoProto.tabletAliasString(tablet.getAlias());
TABLET_QUERY_SERVICE_CACHE.remove(aliasString);
}

protected static void registerTabletCache(final Topodata.Tablet tablet, final IParentQueryService combinedQueryService) {
final String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc");
TABLET_QUERY_SERVICE_CACHE.putIfAbsent(addr, combinedQueryService);
final String aliasString = TopoProto.tabletAliasString(tablet.getAlias());
TABLET_QUERY_SERVICE_CACHE.putIfAbsent(aliasString, combinedQueryService);
}

protected static void clearTabletCache() {
Expand Down
87 changes: 87 additions & 0 deletions src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.jd.jdbc.discovery;

import com.google.common.collect.Lists;
import com.jd.jdbc.common.util.CollectionUtils;
import com.jd.jdbc.context.IContext;
import com.jd.jdbc.context.VtContext;
Expand All @@ -44,6 +45,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.AllArgsConstructor;
import lombok.ToString;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
Expand Down Expand Up @@ -1038,6 +1041,90 @@ public void testHealthyConcurrentModificationException() {
MockTablet.closeQueryService(mockTablet0, mockTablet1, mockTablet2);
}

@Test
public void ipReUse() {
@AllArgsConstructor
@ToString
class TestCase {
final Topodata.TabletType oldType;

final Topodata.TabletType newType;
}
TestCase testCase1 = new TestCase(Topodata.TabletType.REPLICA, Topodata.TabletType.MASTER);
TestCase testCase2 = new TestCase(Topodata.TabletType.REPLICA, Topodata.TabletType.REPLICA);
TestCase testCase3 = new TestCase(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY);
TestCase testCase4 = new TestCase(Topodata.TabletType.MASTER, Topodata.TabletType.MASTER);
TestCase testCase5 = new TestCase(Topodata.TabletType.MASTER, Topodata.TabletType.REPLICA);
TestCase testCase6 = new TestCase(Topodata.TabletType.MASTER, Topodata.TabletType.RDONLY);
TestCase testCase7 = new TestCase(Topodata.TabletType.RDONLY, Topodata.TabletType.MASTER);
TestCase testCase8 = new TestCase(Topodata.TabletType.RDONLY, Topodata.TabletType.REPLICA);
TestCase testCase9 = new TestCase(Topodata.TabletType.RDONLY, Topodata.TabletType.RDONLY);
ArrayList<TestCase> testCases = Lists.newArrayList(testCase1, testCase2, testCase3, testCase4, testCase5, testCase6, testCase7, testCase8, testCase9);

for (TestCase testCase : testCases) {
printComment("HealthCheck Test when Tablet ip reuse");
HealthCheck hc = getHealthCheck();

printComment("1. Add a no-serving Tablet");
String ip = "127.0.0.1";
MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, ip, "k", "s", portMap, testCase.oldType);
hc.addTablet(mockTablet.getTablet());

Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 0, getHealthySize(hc.getHealthyCopy()));
sleepMillisSeconds(200);

printComment("2. Modify the status of Tablet to serving");
sendOnNextMessage(mockTablet, testCase.oldType, true, 0, 0.5, 0);

sleepMillisSeconds(200);

Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 1, getHealthySize(hc.getHealthyCopy()));

printComment("3. Modify the status of Tablet to no serving");
sendOnNextMessage(mockTablet, testCase.oldType, false, 0, 0.5, 0);

sleepMillisSeconds(200);

printComment("4. Add another no-serving new Tablet");
MockTablet mockTablet2 = MockTablet.buildMockTablet(grpcCleanup, "cell", 1, ip, "k2", "s", portMap, testCase.newType);
hc.addTablet(mockTablet2.getTablet());

Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 0, getHealthySize(hc.getHealthyCopy()));
sleepMillisSeconds(200);

printComment("5. Modify the status of new Tablet to serving");
sendOnNextMessage(mockTablet2, testCase.newType, true, 0, 0.5, 1);

sleepMillisSeconds(200);

Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 1, getHealthySize(hc.getHealthyCopy()));

printComment("4. removeTablet old Tablet");
hc.removeTablet(mockTablet.getTablet());

sleepMillisSeconds(2000);

Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 1, getHealthySize(hc.getHealthyCopy()));

MockTablet.closeQueryService(mockTablet, mockTablet2);
printOk("test ipReUse success,testCase :" + testCase);
HealthCheck.resetHealthCheck();
}
}

private int getHealthySize(Map<String, List<TabletHealthCheck>> map) {
int size = 0;
for (Map.Entry<String, List<TabletHealthCheck>> entry : map.entrySet()) {
size += entry.getValue().size();
}
return size;
}

private List<TabletHealthCheck> mockGetHealthyTabletStats(Map<String, List<TabletHealthCheck>> healthy, Query.Target target) {
List<TabletHealthCheck> list = healthy.get(HealthCheck.keyFromTarget(target));
if (null == list || list.isEmpty()) {
Expand Down
52 changes: 6 additions & 46 deletions src/test/java/com/jd/jdbc/discovery/MockTablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,57 +81,17 @@ public static MockTablet buildMockTablet(Topodata.Tablet tablet, GrpcCleanupRule

public static MockTablet buildMockTablet(GrpcCleanupRule grpcCleanup, String cell, Integer uid, String hostName, String keyspaceName, String shard, Map<String, Integer> portMap,
Topodata.TabletType type) {
String serverName = InProcessServerBuilder.generateName();
BlockingQueue<MockQueryServer.HealthCheckMessage> healthMessage = new ArrayBlockingQueue<>(2);
MockQueryServer queryServer = new MockQueryServer(healthMessage);
Server server = null;
try {
server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start();
} catch (IOException e) {
throw new RuntimeException(e);
}

grpcCleanup.register(server);

ManagedChannel channel =
InProcessChannelBuilder.forName(serverName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build();
grpcCleanup.register(channel);

Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type, 3358);
IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet);
TabletDialerAgent.registerTabletCache(tablet, combinedQueryService);

return new MockTablet(tablet, healthMessage, queryServer, server, channel);
return buildMockTablet(grpcCleanup, cell, uid, hostName, keyspaceName, shard, portMap, type, 3358);
}

public static MockTablet buildMockTablet(GrpcCleanupRule grpcCleanup, String cell, Integer uid, String hostName, String keyspaceName, String shard, Map<String, Integer> portMap,
Topodata.TabletType type,
int defaultMysqlPort) {
String serverName = InProcessServerBuilder.generateName();
BlockingQueue<MockQueryServer.HealthCheckMessage> healthMessage = new ArrayBlockingQueue<>(2);
MockQueryServer queryServer = new MockQueryServer(healthMessage);
Server server = null;
try {
server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start();
} catch (IOException e) {
throw new RuntimeException(e);
}

grpcCleanup.register(server);

ManagedChannel channel =
InProcessChannelBuilder.forName(serverName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build();
grpcCleanup.register(channel);

Topodata.TabletType type, int defaultMysqlPort) {
Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type, defaultMysqlPort);
IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet);
TabletDialerAgent.registerTabletCache(tablet, combinedQueryService);

return new MockTablet(tablet, healthMessage, queryServer, server, channel);
return buildMockTablet(tablet, grpcCleanup);
}

public static Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map<String, Integer> portMap, Topodata.TabletType type,
int defaultMysqlPort) {
private static Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map<String, Integer> portMap, Topodata.TabletType type,
int defaultMysqlPort) {
Topodata.TabletAlias tabletAlias = Topodata.TabletAlias.newBuilder().setCell(cell).setUid(uid).build();
Topodata.Tablet.Builder tabletBuilder = Topodata.Tablet.newBuilder()
.setHostname(hostName).setAlias(tabletAlias).setKeyspace(keyspaceName).setShard(shard).setType(type).setMysqlHostname(hostName).setMysqlPort(defaultMysqlPort);
Expand All @@ -141,7 +101,7 @@ public static Topodata.Tablet buildTablet(String cell, Integer uid, String hostN
return tabletBuilder.build();
}

public static void closeQueryService(MockTablet... tablets) {
public static void closeQueryService(MockTablet... tablets) {
MockQueryServer.HealthCheckMessage close = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Close, null);
for (MockTablet tablet : tablets) {
try {
Expand Down
11 changes: 6 additions & 5 deletions src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -76,7 +77,7 @@ public class ServerTest extends TestSuite {
//The ip address needs to be modified to the ip address of the machine where etcd is located
public static String TOPO_GLOBAL_SERVER_ADDRESS = "http://127.0.0.1:2379";

private static ExecutorService executorService = getThreadPool(10, 10);
private static final ExecutorService executorService = getThreadPool(10, 10);

@AfterClass
public static void afterClass() {
Expand Down Expand Up @@ -109,7 +110,7 @@ public static void registerFactory() {
}

@Test
public void case01_testClient() throws ExecutionException, InterruptedException {
public void case01_testClient() throws ExecutionException, InterruptedException, TimeoutException {
Client client = Client.builder().endpoints(TOPO_GLOBAL_PROXY_ADDRESS).build();
Assert.assertNotNull(client);

Expand All @@ -119,7 +120,7 @@ public void case01_testClient() throws ExecutionException, InterruptedException
.withPrefix(ByteSequence.from(sequence.getBytes()))
.build();
CompletableFuture<GetResponse> future = client.getKVClient().get(sequence, option);
GetResponse resp = future.get();
GetResponse resp = future.get(10, TimeUnit.SECONDS);
for (KeyValue kv : resp.getKvs()) {
String key = kv.getKey().toString(Charset.defaultCharset());
Assert.assertNotNull(key);
Expand Down Expand Up @@ -248,7 +249,7 @@ private void commonTestServer(TopoServer topoServer) throws TopoException {
}

@Test
public void case09_getTabletsByRange() throws ExecutionException, InterruptedException {
public void case09_getTabletsByRange() throws ExecutionException, InterruptedException, TimeoutException {

Client client = Client.builder()
.connectTimeout(Duration.ofSeconds(5))
Expand All @@ -264,7 +265,7 @@ public void case09_getTabletsByRange() throws ExecutionException, InterruptedExc
.withRange(endSequence)
.build();
CompletableFuture<GetResponse> future = client.getKVClient().get(startSequence, option);
GetResponse resp = future.get();
GetResponse resp = future.get(10, TimeUnit.SECONDS);
for (KeyValue kv : resp.getKvs()) {
String key = kv.getKey().toString(Charset.defaultCharset());
Assert.assertNotNull(key);
Expand Down

0 comments on commit 1a087a8

Please sign in to comment.