Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bug caused by IP reuse #137

Merged
merged 1 commit into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions src/main/java/com/jd/jdbc/discovery/HealthCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,7 @@ public void replaceTablet(Topodata.Tablet oldTablet, Topodata.Tablet newTablet)
this.addTablet(newTablet);
}

public void updateHealth(final TabletHealthCheck th, final Query.Target preTarget,
final boolean trivialUpdate, boolean up) {
public void updateHealth(final TabletHealthCheck th, final Query.Target preTarget, final boolean trivialUpdate, boolean up) {
String tabletAlias = TopoProto.tabletAliasString(th.getTablet().getAlias());
String targetKey = keyFromTarget(th.getTarget());
boolean targetChanged = preTarget.getTabletType() != th.getTarget().getTabletType()
Expand All @@ -388,11 +387,16 @@ public void updateHealth(final TabletHealthCheck th, final Query.Target preTarge
return;
}
if (targetChanged) {
// keyspace and shard are not expected to change, but just in case ...
// move this tabletHealthCheck to the correct map
String oldTargetKey = keyFromTarget(preTarget);
this.healthData.get(oldTargetKey).remove(tabletAlias);
MapUtil.computeIfAbsent(healthData, targetKey, key -> new HashMap<>());
}
// add it to the map by target and create the map record if needed
MapUtil.computeIfAbsent(healthData, targetKey, key -> new HashMap<>());
this.healthData.get(targetKey).put(tabletAlias, th);

boolean isPrimary = th.getTarget().getTabletType() == Topodata.TabletType.MASTER;
if (isPrimary) {
if (up) {
Expand Down
19 changes: 11 additions & 8 deletions src/main/java/com/jd/jdbc/queryservice/TabletDialer.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.jd.jdbc.queryservice;

import com.jd.jdbc.topo.topoproto.TopoProto;
import com.jd.jdbc.util.threadpool.JdkUtil;
import com.jd.jdbc.util.threadpool.VtThreadFactoryBuilder;
import com.jd.jdbc.util.threadpool.impl.TabletNettyExecutorService;
Expand All @@ -40,11 +41,13 @@ public class TabletDialer {
private static final Map<String, IParentQueryService> TABLET_QUERY_SERVICE_CACHE = new ConcurrentHashMap<>(128 + 1);

public static IParentQueryService dial(final Topodata.Tablet tablet) {
final String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc");
if (TABLET_QUERY_SERVICE_CACHE.containsKey(addr)) {
return TABLET_QUERY_SERVICE_CACHE.get(addr);
final String aliasString = TopoProto.tabletAliasString(tablet.getAlias());
IParentQueryService queryService = TABLET_QUERY_SERVICE_CACHE.get(aliasString);
if (queryService != null) {
return queryService;
}

String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc");
ManagedChannel channel = NettyChannelBuilder.forTarget(addr).usePlaintext()
.offloadExecutor(TabletNettyExecutorService.getNettyExecutorService())
.executor(TabletNettyExecutorService.getNettyExecutorService())
Expand All @@ -53,18 +56,18 @@ public static IParentQueryService dial(final Topodata.Tablet tablet) {
.keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build();

IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet);
TABLET_QUERY_SERVICE_CACHE.putIfAbsent(addr, combinedQueryService);
TABLET_QUERY_SERVICE_CACHE.putIfAbsent(aliasString, combinedQueryService);
return combinedQueryService;
}

public static void close(final Topodata.Tablet tablet) {
final String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc");
TABLET_QUERY_SERVICE_CACHE.remove(addr);
final String aliasString = TopoProto.tabletAliasString(tablet.getAlias());
TABLET_QUERY_SERVICE_CACHE.remove(aliasString);
}

protected static void registerTabletCache(final Topodata.Tablet tablet, final IParentQueryService combinedQueryService) {
final String addr = tablet.getHostname() + ":" + tablet.getPortMapMap().get("grpc");
TABLET_QUERY_SERVICE_CACHE.putIfAbsent(addr, combinedQueryService);
final String aliasString = TopoProto.tabletAliasString(tablet.getAlias());
TABLET_QUERY_SERVICE_CACHE.putIfAbsent(aliasString, combinedQueryService);
}

protected static void clearTabletCache() {
Expand Down
87 changes: 87 additions & 0 deletions src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.jd.jdbc.discovery;

import com.google.common.collect.Lists;
import com.jd.jdbc.common.util.CollectionUtils;
import com.jd.jdbc.context.IContext;
import com.jd.jdbc.context.VtContext;
Expand All @@ -44,6 +45,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.AllArgsConstructor;
import lombok.ToString;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
Expand Down Expand Up @@ -1038,6 +1041,90 @@ public void testHealthyConcurrentModificationException() {
MockTablet.closeQueryService(mockTablet0, mockTablet1, mockTablet2);
}

@Test
public void ipReUse() {
@AllArgsConstructor
@ToString
class TestCase {
final Topodata.TabletType oldType;

final Topodata.TabletType newType;
}
TestCase testCase1 = new TestCase(Topodata.TabletType.REPLICA, Topodata.TabletType.MASTER);
TestCase testCase2 = new TestCase(Topodata.TabletType.REPLICA, Topodata.TabletType.REPLICA);
TestCase testCase3 = new TestCase(Topodata.TabletType.REPLICA, Topodata.TabletType.RDONLY);
TestCase testCase4 = new TestCase(Topodata.TabletType.MASTER, Topodata.TabletType.MASTER);
TestCase testCase5 = new TestCase(Topodata.TabletType.MASTER, Topodata.TabletType.REPLICA);
TestCase testCase6 = new TestCase(Topodata.TabletType.MASTER, Topodata.TabletType.RDONLY);
TestCase testCase7 = new TestCase(Topodata.TabletType.RDONLY, Topodata.TabletType.MASTER);
TestCase testCase8 = new TestCase(Topodata.TabletType.RDONLY, Topodata.TabletType.REPLICA);
TestCase testCase9 = new TestCase(Topodata.TabletType.RDONLY, Topodata.TabletType.RDONLY);
ArrayList<TestCase> testCases = Lists.newArrayList(testCase1, testCase2, testCase3, testCase4, testCase5, testCase6, testCase7, testCase8, testCase9);

for (TestCase testCase : testCases) {
printComment("HealthCheck Test when Tablet ip reuse");
HealthCheck hc = getHealthCheck();

printComment("1. Add a no-serving Tablet");
String ip = "127.0.0.1";
MockTablet mockTablet = MockTablet.buildMockTablet(grpcCleanup, "cell", 0, ip, "k", "s", portMap, testCase.oldType);
hc.addTablet(mockTablet.getTablet());

Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 0, getHealthySize(hc.getHealthyCopy()));
sleepMillisSeconds(200);

printComment("2. Modify the status of Tablet to serving");
sendOnNextMessage(mockTablet, testCase.oldType, true, 0, 0.5, 0);

sleepMillisSeconds(200);

Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 1, getHealthySize(hc.getHealthyCopy()));

printComment("3. Modify the status of Tablet to no serving");
sendOnNextMessage(mockTablet, testCase.oldType, false, 0, 0.5, 0);

sleepMillisSeconds(200);

printComment("4. Add another no-serving new Tablet");
MockTablet mockTablet2 = MockTablet.buildMockTablet(grpcCleanup, "cell", 1, ip, "k2", "s", portMap, testCase.newType);
hc.addTablet(mockTablet2.getTablet());

Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 0, getHealthySize(hc.getHealthyCopy()));
sleepMillisSeconds(200);

printComment("5. Modify the status of new Tablet to serving");
sendOnNextMessage(mockTablet2, testCase.newType, true, 0, 0.5, 1);

sleepMillisSeconds(200);

Assert.assertEquals("Wrong Tablet data", 2, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 1, getHealthySize(hc.getHealthyCopy()));

printComment("4. removeTablet old Tablet");
hc.removeTablet(mockTablet.getTablet());

sleepMillisSeconds(2000);

Assert.assertEquals("Wrong Tablet data", 1, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 1, getHealthySize(hc.getHealthyCopy()));

MockTablet.closeQueryService(mockTablet, mockTablet2);
printOk("test ipReUse success,testCase :" + testCase);
HealthCheck.resetHealthCheck();
}
}

private int getHealthySize(Map<String, List<TabletHealthCheck>> map) {
int size = 0;
for (Map.Entry<String, List<TabletHealthCheck>> entry : map.entrySet()) {
size += entry.getValue().size();
}
return size;
}

private List<TabletHealthCheck> mockGetHealthyTabletStats(Map<String, List<TabletHealthCheck>> healthy, Query.Target target) {
List<TabletHealthCheck> list = healthy.get(HealthCheck.keyFromTarget(target));
if (null == list || list.isEmpty()) {
Expand Down
52 changes: 6 additions & 46 deletions src/test/java/com/jd/jdbc/discovery/MockTablet.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,57 +81,17 @@ public static MockTablet buildMockTablet(Topodata.Tablet tablet, GrpcCleanupRule

public static MockTablet buildMockTablet(GrpcCleanupRule grpcCleanup, String cell, Integer uid, String hostName, String keyspaceName, String shard, Map<String, Integer> portMap,
Topodata.TabletType type) {
String serverName = InProcessServerBuilder.generateName();
BlockingQueue<MockQueryServer.HealthCheckMessage> healthMessage = new ArrayBlockingQueue<>(2);
MockQueryServer queryServer = new MockQueryServer(healthMessage);
Server server = null;
try {
server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start();
} catch (IOException e) {
throw new RuntimeException(e);
}

grpcCleanup.register(server);

ManagedChannel channel =
InProcessChannelBuilder.forName(serverName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build();
grpcCleanup.register(channel);

Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type, 3358);
IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet);
TabletDialerAgent.registerTabletCache(tablet, combinedQueryService);

return new MockTablet(tablet, healthMessage, queryServer, server, channel);
return buildMockTablet(grpcCleanup, cell, uid, hostName, keyspaceName, shard, portMap, type, 3358);
}

public static MockTablet buildMockTablet(GrpcCleanupRule grpcCleanup, String cell, Integer uid, String hostName, String keyspaceName, String shard, Map<String, Integer> portMap,
Topodata.TabletType type,
int defaultMysqlPort) {
String serverName = InProcessServerBuilder.generateName();
BlockingQueue<MockQueryServer.HealthCheckMessage> healthMessage = new ArrayBlockingQueue<>(2);
MockQueryServer queryServer = new MockQueryServer(healthMessage);
Server server = null;
try {
server = InProcessServerBuilder.forName(serverName).directExecutor().addService(queryServer).build().start();
} catch (IOException e) {
throw new RuntimeException(e);
}

grpcCleanup.register(server);

ManagedChannel channel =
InProcessChannelBuilder.forName(serverName).directExecutor().keepAliveTimeout(10, TimeUnit.SECONDS).keepAliveTime(10, TimeUnit.SECONDS).keepAliveWithoutCalls(true).build();
grpcCleanup.register(channel);

Topodata.TabletType type, int defaultMysqlPort) {
Topodata.Tablet tablet = buildTablet(cell, uid, hostName, keyspaceName, shard, portMap, type, defaultMysqlPort);
IParentQueryService combinedQueryService = new CombinedQueryService(channel, tablet);
TabletDialerAgent.registerTabletCache(tablet, combinedQueryService);

return new MockTablet(tablet, healthMessage, queryServer, server, channel);
return buildMockTablet(tablet, grpcCleanup);
}

public static Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map<String, Integer> portMap, Topodata.TabletType type,
int defaultMysqlPort) {
private static Topodata.Tablet buildTablet(String cell, Integer uid, String hostName, String keyspaceName, String shard, Map<String, Integer> portMap, Topodata.TabletType type,
int defaultMysqlPort) {
Topodata.TabletAlias tabletAlias = Topodata.TabletAlias.newBuilder().setCell(cell).setUid(uid).build();
Topodata.Tablet.Builder tabletBuilder = Topodata.Tablet.newBuilder()
.setHostname(hostName).setAlias(tabletAlias).setKeyspace(keyspaceName).setShard(shard).setType(type).setMysqlHostname(hostName).setMysqlPort(defaultMysqlPort);
Expand All @@ -141,7 +101,7 @@ public static Topodata.Tablet buildTablet(String cell, Integer uid, String hostN
return tabletBuilder.build();
}

public static void closeQueryService(MockTablet... tablets) {
public static void closeQueryService(MockTablet... tablets) {
MockQueryServer.HealthCheckMessage close = new MockQueryServer.HealthCheckMessage(MockQueryServer.MessageType.Close, null);
for (MockTablet tablet : tablets) {
try {
Expand Down
11 changes: 6 additions & 5 deletions src/test/java/com/jd/jdbc/topo/etcd2topo/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -76,7 +77,7 @@ public class ServerTest extends TestSuite {
//The ip address needs to be modified to the ip address of the machine where etcd is located
public static String TOPO_GLOBAL_SERVER_ADDRESS = "http://127.0.0.1:2379";

private static ExecutorService executorService = getThreadPool(10, 10);
private static final ExecutorService executorService = getThreadPool(10, 10);

@AfterClass
public static void afterClass() {
Expand Down Expand Up @@ -109,7 +110,7 @@ public static void registerFactory() {
}

@Test
public void case01_testClient() throws ExecutionException, InterruptedException {
public void case01_testClient() throws ExecutionException, InterruptedException, TimeoutException {
Client client = Client.builder().endpoints(TOPO_GLOBAL_PROXY_ADDRESS).build();
Assert.assertNotNull(client);

Expand All @@ -119,7 +120,7 @@ public void case01_testClient() throws ExecutionException, InterruptedException
.withPrefix(ByteSequence.from(sequence.getBytes()))
.build();
CompletableFuture<GetResponse> future = client.getKVClient().get(sequence, option);
GetResponse resp = future.get();
GetResponse resp = future.get(10, TimeUnit.SECONDS);
for (KeyValue kv : resp.getKvs()) {
String key = kv.getKey().toString(Charset.defaultCharset());
Assert.assertNotNull(key);
Expand Down Expand Up @@ -248,7 +249,7 @@ private void commonTestServer(TopoServer topoServer) throws TopoException {
}

@Test
public void case09_getTabletsByRange() throws ExecutionException, InterruptedException {
public void case09_getTabletsByRange() throws ExecutionException, InterruptedException, TimeoutException {

Client client = Client.builder()
.connectTimeout(Duration.ofSeconds(5))
Expand All @@ -264,7 +265,7 @@ public void case09_getTabletsByRange() throws ExecutionException, InterruptedExc
.withRange(endSequence)
.build();
CompletableFuture<GetResponse> future = client.getKVClient().get(startSequence, option);
GetResponse resp = future.get();
GetResponse resp = future.get(10, TimeUnit.SECONDS);
for (KeyValue kv : resp.getKvs()) {
String key = kv.getKey().toString(Charset.defaultCharset());
Assert.assertNotNull(key);
Expand Down
Loading