Skip to content

Commit

Permalink
Unified status configuration in HealthCheck.
Browse files Browse the repository at this point in the history
  • Loading branch information
wangweicugw committed Aug 9, 2023
1 parent eed2111 commit 5b48622
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 116 deletions.
29 changes: 19 additions & 10 deletions src/main/java/com/jd/jdbc/discovery/HealthCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,12 @@ public Map<String, TabletHealthCheck> getHealthByAliasCopy() {
}

public Map<String, List<TabletHealthCheck>> getHealthyCopy() {
return new TreeMap<>(healthy);
Map<String, List<TabletHealthCheck>> treeMap = new TreeMap<>();
for (Map.Entry<String, List<TabletHealthCheck>> entry : healthy.entrySet()) {
List<TabletHealthCheck> tabletHealthChecks = entry.getValue();
treeMap.put(entry.getKey(), new ArrayList<>(tabletHealthChecks));
}
return treeMap;
}

public IQueryService tabletConnection(Topodata.TabletAlias alias) {
Expand Down Expand Up @@ -416,7 +421,6 @@ public void updateHealth(final TabletHealthCheck th, final Query.Target preTarge
this.lock.unlock();
if (targetChanged) {
Topodata.Tablet tablet = th.getTablet().toBuilder().setType(th.getTarget().getTabletType()).setKeyspace(th.getTarget().getKeyspace()).setShard(th.getTarget().getShard()).build();

th.getQueryService().setTablet(tablet);
th.closeNativeQueryService();
}
Expand Down Expand Up @@ -455,11 +459,11 @@ public void run() {
private void doWatchTabletHealthCheckStream(final long currentTimeMillis) {
healthByAlias.forEach((alias, thc) -> {
if (thc.healthCheckCtxIsCanceled()) {
thc.finalizeConn();
thc.shutdown();
return;
}

if (thc.getLastResponseTimestamp() > 0 && currentTimeMillis - thc.getLastResponseTimestamp() > (thc.getHealthCheckTimeout() * 1000)) {
if (thc.getLastResponseTimestamp() > 0 && currentTimeMillis - thc.getLastResponseTimestamp() > thc.getHealthCheckTimeout()) {
thc.getServing().set(false);
thc.getRetrying().set(true);
thc.getLastError().set("health check timed out latest " + thc.getLastResponseTimestamp());
Expand All @@ -468,24 +472,29 @@ private void doWatchTabletHealthCheckStream(final long currentTimeMillis) {
}

TabletHealthCheck.TabletStreamHealthDetailStatus tabletStreamHealthDetailStatus = thc.getTabletStreamHealthDetailStatus().get();
if (tabletStreamHealthDetailStatus.status == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_RESPONSE) {
if (tabletStreamHealthDetailStatus.getStatus() == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_RESPONSE) {
return;
}

if (tabletStreamHealthDetailStatus.status == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR_PACKET) {
if (tabletStreamHealthDetailStatus.getStatus() == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR_PACKET) {
return;
}

if (tabletStreamHealthDetailStatus.status == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR) {
if (tabletStreamHealthDetailStatus.getStatus() == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR) {
thc.getServing().set(false);
thc.getRetrying().set(true);
thc.getLastError().set("health check error :" + tabletStreamHealthDetailStatus.message);
thc.getLastError().set("health check error :" + tabletStreamHealthDetailStatus.getMessage());
thc.startHealthCheckStream();
return;
}

if (tabletStreamHealthDetailStatus.status == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_NONE) {
// tabletHealthCheck is added to healthByAlias just recently, no healthCheck signal has been received.
if (tabletStreamHealthDetailStatus.getStatus() == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_NONE) {
return;
}
if (tabletStreamHealthDetailStatus.getStatus() == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_MISMATCH) {
thc.getServing().set(false);
thc.getLastError().set("health check error :" + tabletStreamHealthDetailStatus.getMessage());
this.removeTablet(thc.getTablet());
return;
}

Expand Down
159 changes: 70 additions & 89 deletions src/main/java/com/jd/jdbc/discovery/TabletHealthCheck.java

Large diffs are not rendered by default.

7 changes: 1 addition & 6 deletions src/main/java/com/jd/jdbc/monitor/HealthyCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,12 +85,7 @@ public static long stateHealthyChecksum(Map<String, List<TabletHealthCheck>> hea
StringBuilder sb = new StringBuilder();

for (List<TabletHealthCheck> tabletHealthCheckList : healthy.values()) {
tabletHealthCheckList.sort(new Comparator<TabletHealthCheck>() {
@Override
public int compare(TabletHealthCheck o1, TabletHealthCheck o2) {
return Long.compare(o1.getTablet().getAlias().getUid(), o2.getTablet().getAlias().getUid());
}
});
tabletHealthCheckList.sort(Comparator.comparingLong(o -> o.getTablet().getAlias().getUid()));

for (TabletHealthCheck tabletHealthCheck : tabletHealthCheckList) {
if (!tabletHealthCheck.getServing().get()) {
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/jd/jdbc/topo/etcd2topo/EtcdWatcher.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
Copyright 2021 JD Project Authors. Licensed under Apache-2.0.
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.jd.jdbc.topo.etcd2topo;

import io.etcd.jetcd.ByteSequence;
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/jd/jdbc/topo/etcd2topo/SrvKeyspaceListener.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
Copyright 2021 JD Project Authors. Licensed under Apache-2.0.
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.jd.jdbc.topo.etcd2topo;

import com.jd.jdbc.Executor;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
Copyright 2021 JD Project Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.jd.jdbc.util.threadpool.impl;

import com.jd.jdbc.monitor.ThreadPoolCollector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package com.jd.jdbc.vitess.metadata;

import static com.jd.jdbc.common.Constant.DRIVER_MAJOR_VERSION;
import static com.jd.jdbc.common.Constant.DRIVER_MINOR_VERSION;
import static com.jd.jdbc.common.Constant.DRIVER_NAME;
import com.jd.jdbc.pool.InnerConnection;
import com.jd.jdbc.pool.StatefulConnectionPool;
import com.jd.jdbc.queryservice.util.RoleUtils;
Expand Down
18 changes: 18 additions & 0 deletions src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,21 @@
/*
Copyright 2021 JD Project Authors. Licensed under Apache-2.0.
Copyright 2019 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package com.jd.jdbc.discovery;

import com.jd.jdbc.common.Constant;
Expand Down
132 changes: 122 additions & 10 deletions src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,34 +42,44 @@
import io.vitess.proto.Topodata;
import java.io.IOException;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.AllArgsConstructor;
import lombok.Getter;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import testsuite.TestSuite;

public class HealthCheckTest extends TestSuite {
private static final IContext globalContext = VtContext.withCancel(VtContext.background());

private final Map<String, Integer> portMap = new HashMap<>();

private int defaultMysqlPort = 3358;
private static final ExecutorService executorService = getThreadPool(10, 10);

@Rule
public GrpcCleanupRule grpcCleanup;

@Rule
public ExpectedException thrown = ExpectedException.none();

private final Map<String, Integer> portMap = new HashMap<>();

private int defaultMysqlPort = 3358;

@BeforeClass
public static void initPool() {
HealthCheck.resetHealthCheck();
Expand All @@ -80,6 +90,11 @@ public static void initPool() {
VtQueryExecutorService.initialize(null, null, null, null);
}

@AfterClass
public static void afterClass() {
executorService.shutdownNow();
}

@Before
public void init() throws IOException {
grpcCleanup = new GrpcCleanupRule();
Expand Down Expand Up @@ -386,16 +401,14 @@ public void testHealthCheckOnNextBeforeRemove() throws IOException, InterruptedE
Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size());

// remove tablet and send a onNext message parallel
ExecutorService pool = getThreadPool(1, 1);
sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.2, 2);
pool.execute(() -> hc.removeTablet(mockTablet.getTablet()));
executorService.execute(() -> hc.removeTablet(mockTablet.getTablet()));

Thread.sleep(200);
// healthcheck shouldn't response onNext message
Assert.assertEquals("Wrong Tablet data", 0, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size());
closeQueryService(mockTablet);
pool.shutdown();
printOk();
}

Expand Down Expand Up @@ -423,16 +436,14 @@ public void testHealthCheckOnNextAfterRemove() throws IOException, InterruptedEx
Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size());

// remove tablet and send a onNext message parallel
ExecutorService pool = getThreadPool(1, 1);
pool.execute(() -> hc.removeTablet(mockTablet.getTablet()));
executorService.execute(() -> hc.removeTablet(mockTablet.getTablet()));
sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.2, 2);

Thread.sleep(200);
// healthcheck shouldn't response onNext message
Assert.assertEquals("Wrong Tablet data", 0, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size());
closeQueryService(mockTablet);
pool.shutdown();
printOk();
}

Expand Down Expand Up @@ -896,7 +907,6 @@ public void testHealthyListChecksum() {
Topodata.Tablet tablet2 = buildTablet("cella", 8, "1.1.1.8", "k", "s", portMap, Topodata.TabletType.REPLICA);
Query.Target target = Query.Target.newBuilder().setKeyspace(tablet1.getKeyspace()).setShard(tablet1.getShard()).setTabletType(tablet1.getType()).build();


Map<String, List<TabletHealthCheck>> healthy1 = hc.getHealthyCopy();
List<TabletHealthCheck> healthyMap1 = new ArrayList<>();

Expand Down Expand Up @@ -953,6 +963,108 @@ public void testHealthyChecksumSetBehindMaster() throws IOException, Interrupted
closeQueryService(mockTablet1, mockTablet2);
}

@Test
public void testConcurrentModificationException() throws InterruptedException {
thrown.expect(ConcurrentModificationException.class);

List<TabletHealthCheck> tabletHealthCheckList = new ArrayList<>();
String keyspace = "k";
String shard = "s";
Topodata.TabletType type = Topodata.TabletType.REPLICA;
Query.Target target = Query.Target.newBuilder().setKeyspace(keyspace).setShard(keyspace).setTabletType(type).build();
Topodata.Tablet tablet1 = buildTablet("cell1", 1, "a", keyspace, shard, portMap, type);
Topodata.Tablet tablet2 = buildTablet("cell1", 100, "a", keyspace, shard, portMap, type);
Topodata.Tablet tablet3 = buildTablet("cell1", 10, "a", keyspace, shard, portMap, type);
TabletHealthCheck tabletHealthCheck1 = new TabletHealthCheck(HealthCheck.INSTANCE, tablet1, target);
TabletHealthCheck tabletHealthCheck2 = new TabletHealthCheck(HealthCheck.INSTANCE, tablet2, target);
TabletHealthCheck tabletHealthCheck3 = new TabletHealthCheck(HealthCheck.INSTANCE, tablet3, target);
tabletHealthCheckList.add(tabletHealthCheck1);
tabletHealthCheckList.add(tabletHealthCheck2);
tabletHealthCheckList.add(tabletHealthCheck3);
Map<String, List<TabletHealthCheck>> healthy = new ConcurrentHashMap<>(16);
healthy.put(HealthCheck.keyFromTarget(target), tabletHealthCheckList);

Map<String, List<TabletHealthCheck>> treeMap = new TreeMap<>(healthy);

int count = 1000000;
executorService.execute(() -> {
for (int i = 0; i < count; i++) {
HealthyCollector.stateHealthyChecksum(treeMap);
}
});
for (int i = 0; i < count; i++) {
mockGetHealthyTabletStats(healthy, target);
}

Thread.sleep(2000);
}

@Test
public void testHealthyConcurrentModificationException() throws InterruptedException, IOException {
HealthCheck hc = getHealthCheck();

String keyspace = "k";
String shard = "s";
// add replica tablet
MockTablet mockTablet0 = buildMockTablet("cell", 1, "a", keyspace, shard, portMap, Topodata.TabletType.REPLICA);
hc.addTablet(mockTablet0.getTablet());
// add replica tablet
MockTablet mockTablet1 = buildMockTablet("cell", 100, "b", keyspace, shard, portMap, Topodata.TabletType.REPLICA);
hc.addTablet(mockTablet1.getTablet());
// add replica tablet
MockTablet mockTablet2 = buildMockTablet("cell", 10, "c", keyspace, shard, portMap, Topodata.TabletType.REPLICA);
hc.addTablet(mockTablet2.getTablet());

Thread.sleep(200);
Assert.assertEquals("Wrong Tablet data", 3, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size());

printComment("c. Modify the status of Tablet to serving");
sendOnNextMessage(mockTablet0, Topodata.TabletType.REPLICA, true, 0, 0.5, 0);
sendOnNextMessage(mockTablet1, Topodata.TabletType.REPLICA, true, 0, 0.5, 0);
sendOnNextMessage(mockTablet2, Topodata.TabletType.REPLICA, true, 0, 0.5, 0);

Thread.sleep(200);

Assert.assertEquals("Wrong Tablet data", 3, hc.getHealthByAliasCopy().size());
Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size());
List<TabletHealthCheck> healthyTabletStats = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA));
Assert.assertNotNull(healthyTabletStats);
Assert.assertEquals("Wrong Tablet data", 3, healthyTabletStats.size());

int count = 1000000;
Map<String, List<TabletHealthCheck>> healthy = hc.getHealthyCopy();
executorService.execute(() -> {
for (int i = 0; i < count; i++) {
HealthyCollector.stateHealthyChecksum(healthy);
}
});
for (int i = 0; i < count; i++) {
healthyTabletStats = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA));
Assert.assertNotNull(healthyTabletStats);
Assert.assertEquals("Wrong Tablet data", 3, healthyTabletStats.size());
}

Thread.sleep(2000);
}

private List<TabletHealthCheck> mockGetHealthyTabletStats(Map<String, List<TabletHealthCheck>> healthy, Query.Target target) {
List<TabletHealthCheck> list = healthy.get(HealthCheck.keyFromTarget(target));
if (null == list || list.isEmpty()) {
return null;
}
if (target.getTabletType() == Topodata.TabletType.MASTER) {
return list;
}
List<TabletHealthCheck> servlist = new ArrayList<>(list.size());
list.forEach(entry -> {
if (entry.getServing().get()) {
servlist.add(entry);
}
});
return servlist;
}

private void startWatchTopo(String keyspaceName, TopoServer topoServer, String... cells) {
for (String cell : cells) {
TopologyWatcherManager.INSTANCE.startWatch(globalContext, topoServer, cell, keyspaceName);
Expand Down
Loading

0 comments on commit 5b48622

Please sign in to comment.