diff --git a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java index 0413f93..cda776a 100644 --- a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java +++ b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java @@ -133,7 +133,12 @@ public Map getHealthByAliasCopy() { } public Map> getHealthyCopy() { - return new TreeMap<>(healthy); + Map> treeMap = new TreeMap<>(); + for (Map.Entry> entry : healthy.entrySet()) { + List tabletHealthChecks = entry.getValue(); + treeMap.put(entry.getKey(), new ArrayList<>(tabletHealthChecks)); + } + return treeMap; } public IQueryService tabletConnection(Topodata.TabletAlias alias) { @@ -416,7 +421,6 @@ public void updateHealth(final TabletHealthCheck th, final Query.Target preTarge this.lock.unlock(); if (targetChanged) { Topodata.Tablet tablet = th.getTablet().toBuilder().setType(th.getTarget().getTabletType()).setKeyspace(th.getTarget().getKeyspace()).setShard(th.getTarget().getShard()).build(); - th.getQueryService().setTablet(tablet); th.closeNativeQueryService(); } @@ -455,11 +459,11 @@ public void run() { private void doWatchTabletHealthCheckStream(final long currentTimeMillis) { healthByAlias.forEach((alias, thc) -> { if (thc.healthCheckCtxIsCanceled()) { - thc.finalizeConn(); + thc.shutdown(); return; } - if (thc.getLastResponseTimestamp() > 0 && currentTimeMillis - thc.getLastResponseTimestamp() > (thc.getHealthCheckTimeout() * 1000)) { + if (thc.getLastResponseTimestamp() > 0 && currentTimeMillis - thc.getLastResponseTimestamp() > thc.getHealthCheckTimeout()) { thc.getServing().set(false); thc.getRetrying().set(true); thc.getLastError().set("health check timed out latest " + thc.getLastResponseTimestamp()); @@ -468,24 +472,29 @@ private void doWatchTabletHealthCheckStream(final long currentTimeMillis) { } TabletHealthCheck.TabletStreamHealthDetailStatus tabletStreamHealthDetailStatus = thc.getTabletStreamHealthDetailStatus().get(); - if (tabletStreamHealthDetailStatus.status == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_RESPONSE) { + if (tabletStreamHealthDetailStatus.getStatus() == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_RESPONSE) { return; } - if (tabletStreamHealthDetailStatus.status == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR_PACKET) { + if (tabletStreamHealthDetailStatus.getStatus() == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR_PACKET) { return; } - if (tabletStreamHealthDetailStatus.status == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR) { + if (tabletStreamHealthDetailStatus.getStatus() == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR) { thc.getServing().set(false); thc.getRetrying().set(true); - thc.getLastError().set("health check error :" + tabletStreamHealthDetailStatus.message); + thc.getLastError().set("health check error :" + tabletStreamHealthDetailStatus.getMessage()); thc.startHealthCheckStream(); return; } - if (tabletStreamHealthDetailStatus.status == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_NONE) { - // tabletHealthCheck is added to healthByAlias just recently, no healthCheck signal has been received. + if (tabletStreamHealthDetailStatus.getStatus() == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_NONE) { + return; + } + if (tabletStreamHealthDetailStatus.getStatus() == TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_MISMATCH) { + thc.getServing().set(false); + thc.getLastError().set("health check error :" + tabletStreamHealthDetailStatus.getMessage()); + this.removeTablet(thc.getTablet()); return; } diff --git a/src/main/java/com/jd/jdbc/discovery/TabletHealthCheck.java b/src/main/java/com/jd/jdbc/discovery/TabletHealthCheck.java index 1a41b07..6b56304 100644 --- a/src/main/java/com/jd/jdbc/discovery/TabletHealthCheck.java +++ b/src/main/java/com/jd/jdbc/discovery/TabletHealthCheck.java @@ -18,18 +18,13 @@ package com.jd.jdbc.discovery; -import static com.jd.jdbc.discovery.TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_CANCELED; -import static com.jd.jdbc.discovery.TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR; -import static com.jd.jdbc.discovery.TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR_PACKET; -import static com.jd.jdbc.discovery.TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_MISMATCH; -import static com.jd.jdbc.discovery.TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_NONE; -import static com.jd.jdbc.discovery.TabletHealthCheck.TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_RESPONSE; import com.jd.jdbc.queryservice.IHealthCheckQueryService; import com.jd.jdbc.queryservice.IParentQueryService; import com.jd.jdbc.queryservice.IQueryService; import com.jd.jdbc.queryservice.TabletDialer; import com.jd.jdbc.sqlparser.support.logging.Log; import com.jd.jdbc.sqlparser.support.logging.LogFactory; +import com.jd.jdbc.sqlparser.utils.StringUtils; import com.jd.jdbc.topo.topoproto.TopoProto; import io.grpc.Status; import io.grpc.StatusRuntimeException; @@ -38,23 +33,24 @@ import io.vitess.proto.Query; import io.vitess.proto.Query.StreamHealthResponse; import io.vitess.proto.Topodata; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import lombok.AllArgsConstructor; -import lombok.Data; import lombok.Getter; import lombok.Setter; -@Data public class TabletHealthCheck { private static final Log log = LogFactory.getLog(TabletHealthCheck.class); - private static final String CTX_RESPONSE = "health check response ok"; + private static final String STOP_HEALTH_CHECK_STREAM = "stopHealthCheckStream"; - private static final String CTX_TABLET_STATUS_MISMATCH = "health check response stats mismatch"; - - private final long healthCheckTimeout = 60; //second + /** + * 60s + */ + @Getter + private final long healthCheckTimeout = 60000; @Getter private final AtomicBoolean serving; @@ -63,15 +59,15 @@ public class TabletHealthCheck { @Setter private AtomicBoolean retrying; - // ITabletQueryService tabletQueryService; private IParentQueryService queryService; private ReentrantLock lock = new ReentrantLock(); @Getter private AtomicReference tabletStreamHealthDetailStatus = - new AtomicReference<>(new TabletStreamHealthDetailStatus(TABLET_STREAM_HEALTH_STATUS_NONE, "")); + new AtomicReference<>(new TabletStreamHealthDetailStatus(TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_NONE, null)); + @Getter private Query.Target target; private HealthCheck healthCheck; @@ -79,16 +75,20 @@ public class TabletHealthCheck { @Getter private Topodata.Tablet tablet; + @Getter private Long masterTermStartTime; private ClientResponseObserver responseObserver; private ClientCallStreamObserver savedClientCallStreamObserver; + @Getter private long lastResponseTimestamp; + @Getter private Query.RealtimeStats stats; + @Getter private AtomicReference lastError; public TabletHealthCheck(HealthCheck healthCheck, Topodata.Tablet tablet, Query.Target target) { @@ -113,7 +113,7 @@ public IQueryService getQueryService() { } } - public IHealthCheckQueryService getHealthCheckQueryService() { + private IHealthCheckQueryService getHealthCheckQueryService() { this.lock.lock(); try { return (IHealthCheckQueryService) getTabletQueryServiceLocked(); @@ -125,36 +125,11 @@ public IHealthCheckQueryService getHealthCheckQueryService() { private IParentQueryService getTabletQueryServiceLocked() { if (this.queryService == null) { this.queryService = TabletDialer.dial(this.tablet); - log.info("create tablet query service: " + TopoProto.tabletToHumanString(tablet)); - } - return this.queryService; - } - - public void closeTabletQueryService(String err) { - this.lastError.set(err); - log.info("close tablet query service: " + TopoProto.tabletToHumanString(tablet) + ", due to error: " + err); - finalizeConn(); - } - - public void finalizeConn() { - this.lock.lock(); - try { - this.serving.set(false); - log.info("final close tablet query service: " + TopoProto.tabletToHumanString(tablet)); - - if (savedClientCallStreamObserver != null) { - savedClientCallStreamObserver.cancel("cancel stream health", null); - savedClientCallStreamObserver = null; - } - - // this.lastError todo set during checkConn - if (this.queryService != null) { - this.queryService.close(); - this.queryService = null; + if (log.isDebugEnabled()) { + log.debug("create tablet query service: " + TopoProto.tabletToHumanString(tablet)); } - } finally { - this.lock.unlock(); } + return this.queryService; } //topo 线程调用 @@ -182,7 +157,6 @@ public void onCompleted() { } }; - try { getHealthCheckQueryService().streamHealth(this, responseObserver); } catch (Throwable e) { @@ -191,7 +165,7 @@ public void onCompleted() { savedClientCallStreamObserver.cancel("streamHealth Exception", e); savedClientCallStreamObserver = null; } - this.tabletStreamHealthDetailStatus.set(new TabletStreamHealthDetailStatus(TABLET_STREAM_HEALTH_STATUS_ERROR, "init stream health error, need restart stream health")); + this.setStatus(TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR, "init stream health error, need restart stream health"); } finally { this.lock.unlock(); } @@ -201,7 +175,7 @@ public void startHealthCheckStream() { this.lock.lock(); try { if (savedClientCallStreamObserver != null) { - this.savedClientCallStreamObserver.cancel("stopHealthCheckStream", null); + this.savedClientCallStreamObserver.cancel(STOP_HEALTH_CHECK_STREAM, null); savedClientCallStreamObserver = null; } @@ -212,50 +186,50 @@ public void startHealthCheckStream() { savedClientCallStreamObserver.cancel("streamHealth Exception", e); savedClientCallStreamObserver = null; } - this.tabletStreamHealthDetailStatus.set(new TabletStreamHealthDetailStatus(TABLET_STREAM_HEALTH_STATUS_ERROR, "start stream health error, need restart stream health")); + this.setStatus(TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR, "start stream health error, need restart stream health"); } } finally { this.lock.unlock(); } } - public void handleStreamHealthResponse(StreamHealthResponse response) { - lock.lock(); + private void handleStreamHealthResponse(StreamHealthResponse response) { + this.lock.lock(); try { if (this.healthCheckCtxIsCanceled()) { return; } - if (response == null || response.getTarget() == null || response.getRealtimeStats() == null) { - this.tabletStreamHealthDetailStatus.set(new TabletStreamHealthDetailStatus(TABLET_STREAM_HEALTH_STATUS_ERROR_PACKET, "health stats is not valid " + response.toString())); + if (response == null) { + this.setStatus(TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR_PACKET, "health stats is not valid "); return; } - this.tabletStreamHealthDetailStatus.set(new TabletStreamHealthDetailStatus(TABLET_STREAM_HEALTH_STATUS_RESPONSE, "")); + if (Objects.equals(Query.Target.getDefaultInstance(), response.getTarget())) { + this.setStatus(TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR_PACKET, "health stats is not valid " + response); + return; + } + this.setStatus(TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_RESPONSE, null); String healthError = null; boolean serving = response.getServing(); - if (!isEmptyStr(response.getRealtimeStats().getHealthError())) { + if (StringUtils.isNotEmpty(response.getRealtimeStats().getHealthError())) { healthError = "vttablet error: " + response.getRealtimeStats().getHealthError(); serving = false; } - if (response.getTabletAlias() != null && !TopoProto.tabletAliasEqual(response.getTabletAlias(), this.tablet.getAlias())) { - this.tabletStreamHealthDetailStatus.set(new TabletStreamHealthDetailStatus(TABLET_STREAM_HEALTH_STATUS_MISMATCH, - "health stats mismatch, tablet " + this.tablet.getAlias() + " alias does not match response alias " + response.getTabletAlias())); + if (!TopoProto.tabletAliasEqual(response.getTabletAlias(), this.tablet.getAlias())) { + this.setStatus(TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_MISMATCH, + "health stats mismatch, tablet " + this.tablet.getAlias() + " alias does not match response alias " + response.getTabletAlias()); //delete this :handleStreamHealthError already check this error - //this.healthCheck.removeTablet(this.getTablet()); + this.serving.set(false); return; } Query.Target prevTarget = this.target; - boolean trivialUpdate = isEmptyStr(this.lastError.get()) && - this.serving.get() && - isEmptyStr(response.getRealtimeStats().getHealthError()) && - response.getServing() && - prevTarget.getTabletType() != Topodata.TabletType.MASTER && - prevTarget.getTabletType() == response.getTarget().getTabletType() && - this.isTrivialReplagChange(response.getRealtimeStats()); + boolean trivialUpdate = StringUtils.isEmpty(this.lastError.get()) && this.serving.get() && StringUtils.isEmpty(response.getRealtimeStats().getHealthError()) && response.getServing() + && prevTarget.getTabletType() != Topodata.TabletType.MASTER && prevTarget.getTabletType() == response.getTarget().getTabletType() + && this.isTrivialReplagChange(response.getRealtimeStats()); this.lastResponseTimestamp = System.currentTimeMillis(); this.target = response.getTarget(); this.masterTermStartTime = response.getTabletExternallyReparentedTimestamp(); @@ -274,24 +248,23 @@ public void handleStreamHealthResponse(StreamHealthResponse response) { } } - public void handleStreamHealthError(Throwable t) { + private void handleStreamHealthError(Throwable t) { this.serving.set(false); log.error("tablet " + TopoProto.tabletToHumanString(tablet) + " handleStreamHealthError error msg : " + t.getMessage()); if (t.getMessage().toLowerCase().contains("health stats mismatch")) { - //removeTablet 方法会调用 cancelHealthCheckCtx this.healthCheck.removeTablet(this.tablet); return; - } else if (t.getMessage().contains("stopHealthCheckStream")) { + } else if (t.getMessage().contains(STOP_HEALTH_CHECK_STREAM)) { return; } else { boolean logFlag = false; if (t instanceof StatusRuntimeException) { StatusRuntimeException statusRuntimeException = (StatusRuntimeException) t; Status status = statusRuntimeException.getStatus(); - logFlag = "channel closed".equals(status.getDescription()) || - "io exception".equals(status.getDescription()) || - "Keepalive failed. The connection is likely gone".equals(status.getDescription()) || - "Network closed for unknown reason".equals(status.getDescription()); + logFlag = "channel closed".equals(status.getDescription()) + || "io exception".equals(status.getDescription()) + || "Keepalive failed. The connection is likely gone".equals(status.getDescription()) + || "Network closed for unknown reason".equals(status.getDescription()); } if (!logFlag) { log.error("tablet " + TopoProto.tabletToHumanString(tablet) + " handleStreamHealthError error msg : ", t); @@ -305,10 +278,7 @@ public void handleStreamHealthError(Throwable t) { if (t.getMessage().contains("cancel stream health")) { log.info("tablet " + TopoProto.tabletToHumanString(tablet) + "cancel stream health " + this.tablet.getHostname()); } - if (healthCheckCtxIsCanceled()) { - return; - } - this.tabletStreamHealthDetailStatus.set(new TabletStreamHealthDetailStatus(TABLET_STREAM_HEALTH_STATUS_ERROR, t.getMessage())); + this.setStatus(TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_ERROR, t.getMessage()); } //release the underlying connection resources. @@ -317,7 +287,7 @@ public void shutdown() { this.lock.lock(); try { if (savedClientCallStreamObserver != null) { - this.savedClientCallStreamObserver.cancel("stopHealthCheckStream", null); + this.savedClientCallStreamObserver.cancel(STOP_HEALTH_CHECK_STREAM, null); savedClientCallStreamObserver = null; } if (this.queryService != null) { @@ -336,22 +306,18 @@ public void closeNativeQueryService() { } public void cancelHealthCheckCtx() { - this.tabletStreamHealthDetailStatus.set(new TabletStreamHealthDetailStatus(TABLET_STREAM_HEALTH_STATUS_CANCELED, "")); + this.setStatus(TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_CANCELED, null); } public boolean healthCheckCtxIsCanceled() { - return this.tabletStreamHealthDetailStatus.get().status == TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_CANCELED; + return Objects.equals(this.tabletStreamHealthDetailStatus.get().getStatus(), TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_CANCELED); } - public void handleStreamHealthComplete() { - this.tabletStreamHealthDetailStatus.set(new TabletStreamHealthDetailStatus(TABLET_STREAM_HEALTH_STATUS_CANCELED, "")); + private void handleStreamHealthComplete() { + this.setStatus(TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_CANCELED, null); } - public boolean isEmptyStr(String str) { - return null == str || str.isEmpty(); - } - - public boolean isTrivialReplagChange(Query.RealtimeStats stats) { + private boolean isTrivialReplagChange(Query.RealtimeStats stats) { if (this.stats == null) { return false; } @@ -375,11 +341,26 @@ public enum TabletStreamHealthStatus { TABLET_STREAM_HEALTH_STATUS_ERROR_PACKET } - @Data + private void setStatus(TabletStreamHealthStatus status, String message) { + TabletStreamHealthStatus preStatus = this.tabletStreamHealthDetailStatus.get().getStatus(); + if (Objects.equals(preStatus, TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_CANCELED)) { + log.error("setStatus error preStatus=" + preStatus + " targetStatus=" + status); + return; + } + if (Objects.equals(preStatus, status)) { + return; + } + if (Objects.equals(status, TabletStreamHealthStatus.TABLET_STREAM_HEALTH_STATUS_NONE)) { + return; + } + this.tabletStreamHealthDetailStatus.set(new TabletStreamHealthDetailStatus(status, message)); + } + @AllArgsConstructor - public class TabletStreamHealthDetailStatus { - TabletStreamHealthStatus status; + @Getter + public static class TabletStreamHealthDetailStatus { + private final TabletStreamHealthStatus status; - String message; + private final String message; } } diff --git a/src/main/java/com/jd/jdbc/monitor/HealthyCollector.java b/src/main/java/com/jd/jdbc/monitor/HealthyCollector.java index 78fce24..a00d9e7 100644 --- a/src/main/java/com/jd/jdbc/monitor/HealthyCollector.java +++ b/src/main/java/com/jd/jdbc/monitor/HealthyCollector.java @@ -85,12 +85,7 @@ public static long stateHealthyChecksum(Map> hea StringBuilder sb = new StringBuilder(); for (List tabletHealthCheckList : healthy.values()) { - tabletHealthCheckList.sort(new Comparator() { - @Override - public int compare(TabletHealthCheck o1, TabletHealthCheck o2) { - return Long.compare(o1.getTablet().getAlias().getUid(), o2.getTablet().getAlias().getUid()); - } - }); + tabletHealthCheckList.sort(Comparator.comparingLong(o -> o.getTablet().getAlias().getUid())); for (TabletHealthCheck tabletHealthCheck : tabletHealthCheckList) { if (!tabletHealthCheck.getServing().get()) { diff --git a/src/main/java/com/jd/jdbc/topo/etcd2topo/EtcdWatcher.java b/src/main/java/com/jd/jdbc/topo/etcd2topo/EtcdWatcher.java index 1213229..f96c001 100644 --- a/src/main/java/com/jd/jdbc/topo/etcd2topo/EtcdWatcher.java +++ b/src/main/java/com/jd/jdbc/topo/etcd2topo/EtcdWatcher.java @@ -1,3 +1,21 @@ +/* +Copyright 2021 JD Project Authors. Licensed under Apache-2.0. + +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package com.jd.jdbc.topo.etcd2topo; import io.etcd.jetcd.ByteSequence; diff --git a/src/main/java/com/jd/jdbc/topo/etcd2topo/SrvKeyspaceListener.java b/src/main/java/com/jd/jdbc/topo/etcd2topo/SrvKeyspaceListener.java index c9c46a4..7abc453 100644 --- a/src/main/java/com/jd/jdbc/topo/etcd2topo/SrvKeyspaceListener.java +++ b/src/main/java/com/jd/jdbc/topo/etcd2topo/SrvKeyspaceListener.java @@ -1,3 +1,21 @@ +/* +Copyright 2021 JD Project Authors. Licensed under Apache-2.0. + +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package com.jd.jdbc.topo.etcd2topo; import com.jd.jdbc.Executor; diff --git a/src/main/java/com/jd/jdbc/util/threadpool/impl/TabletNettyExecutorService.java b/src/main/java/com/jd/jdbc/util/threadpool/impl/TabletNettyExecutorService.java index 9bb6792..34a1e7d 100644 --- a/src/main/java/com/jd/jdbc/util/threadpool/impl/TabletNettyExecutorService.java +++ b/src/main/java/com/jd/jdbc/util/threadpool/impl/TabletNettyExecutorService.java @@ -1,3 +1,19 @@ +/* +Copyright 2021 JD Project Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package com.jd.jdbc.util.threadpool.impl; import com.jd.jdbc.monitor.ThreadPoolCollector; diff --git a/src/main/java/com/jd/jdbc/vitess/metadata/VitessDatabaseMetaData.java b/src/main/java/com/jd/jdbc/vitess/metadata/VitessDatabaseMetaData.java index c328098..c519516 100644 --- a/src/main/java/com/jd/jdbc/vitess/metadata/VitessDatabaseMetaData.java +++ b/src/main/java/com/jd/jdbc/vitess/metadata/VitessDatabaseMetaData.java @@ -16,6 +16,9 @@ package com.jd.jdbc.vitess.metadata; +import static com.jd.jdbc.common.Constant.DRIVER_MAJOR_VERSION; +import static com.jd.jdbc.common.Constant.DRIVER_MINOR_VERSION; +import static com.jd.jdbc.common.Constant.DRIVER_NAME; import com.jd.jdbc.pool.InnerConnection; import com.jd.jdbc.pool.StatefulConnectionPool; import com.jd.jdbc.queryservice.util.RoleUtils; diff --git a/src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java b/src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java index 2eddf8f..5a7db0b 100644 --- a/src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java +++ b/src/test/java/com/jd/jdbc/discovery/EtcdTopoServerTest.java @@ -1,3 +1,21 @@ +/* +Copyright 2021 JD Project Authors. Licensed under Apache-2.0. + +Copyright 2019 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package com.jd.jdbc.discovery; import com.jd.jdbc.common.Constant; diff --git a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java index 7847b92..f88d41b 100644 --- a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java +++ b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java @@ -42,34 +42,44 @@ import io.vitess.proto.Topodata; import java.io.IOException; import java.util.ArrayList; +import java.util.ConcurrentModificationException; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.TreeMap; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import lombok.AllArgsConstructor; import lombok.Getter; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import testsuite.TestSuite; public class HealthCheckTest extends TestSuite { private static final IContext globalContext = VtContext.withCancel(VtContext.background()); - private final Map portMap = new HashMap<>(); - - private int defaultMysqlPort = 3358; + private static final ExecutorService executorService = getThreadPool(10, 10); @Rule public GrpcCleanupRule grpcCleanup; + @Rule + public ExpectedException thrown = ExpectedException.none(); + + private final Map portMap = new HashMap<>(); + + private int defaultMysqlPort = 3358; + @BeforeClass public static void initPool() { HealthCheck.resetHealthCheck(); @@ -80,6 +90,11 @@ public static void initPool() { VtQueryExecutorService.initialize(null, null, null, null); } + @AfterClass + public static void afterClass() { + executorService.shutdownNow(); + } + @Before public void init() throws IOException { grpcCleanup = new GrpcCleanupRule(); @@ -386,16 +401,14 @@ public void testHealthCheckOnNextBeforeRemove() throws IOException, InterruptedE Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); // remove tablet and send a onNext message parallel - ExecutorService pool = getThreadPool(1, 1); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.2, 2); - pool.execute(() -> hc.removeTablet(mockTablet.getTablet())); + executorService.execute(() -> hc.removeTablet(mockTablet.getTablet())); Thread.sleep(200); // healthcheck shouldn't response onNext message Assert.assertEquals("Wrong Tablet data", 0, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); closeQueryService(mockTablet); - pool.shutdown(); printOk(); } @@ -423,8 +436,7 @@ public void testHealthCheckOnNextAfterRemove() throws IOException, InterruptedEx Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); // remove tablet and send a onNext message parallel - ExecutorService pool = getThreadPool(1, 1); - pool.execute(() -> hc.removeTablet(mockTablet.getTablet())); + executorService.execute(() -> hc.removeTablet(mockTablet.getTablet())); sendOnNextMessage(mockTablet, Topodata.TabletType.REPLICA, true, 0, 0.2, 2); Thread.sleep(200); @@ -432,7 +444,6 @@ public void testHealthCheckOnNextAfterRemove() throws IOException, InterruptedEx Assert.assertEquals("Wrong Tablet data", 0, hc.getHealthByAliasCopy().size()); Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); closeQueryService(mockTablet); - pool.shutdown(); printOk(); } @@ -896,7 +907,6 @@ public void testHealthyListChecksum() { Topodata.Tablet tablet2 = buildTablet("cella", 8, "1.1.1.8", "k", "s", portMap, Topodata.TabletType.REPLICA); Query.Target target = Query.Target.newBuilder().setKeyspace(tablet1.getKeyspace()).setShard(tablet1.getShard()).setTabletType(tablet1.getType()).build(); - Map> healthy1 = hc.getHealthyCopy(); List healthyMap1 = new ArrayList<>(); @@ -953,6 +963,108 @@ public void testHealthyChecksumSetBehindMaster() throws IOException, Interrupted closeQueryService(mockTablet1, mockTablet2); } + @Test + public void testConcurrentModificationException() throws InterruptedException { + thrown.expect(ConcurrentModificationException.class); + + List tabletHealthCheckList = new ArrayList<>(); + String keyspace = "k"; + String shard = "s"; + Topodata.TabletType type = Topodata.TabletType.REPLICA; + Query.Target target = Query.Target.newBuilder().setKeyspace(keyspace).setShard(keyspace).setTabletType(type).build(); + Topodata.Tablet tablet1 = buildTablet("cell1", 1, "a", keyspace, shard, portMap, type); + Topodata.Tablet tablet2 = buildTablet("cell1", 100, "a", keyspace, shard, portMap, type); + Topodata.Tablet tablet3 = buildTablet("cell1", 10, "a", keyspace, shard, portMap, type); + TabletHealthCheck tabletHealthCheck1 = new TabletHealthCheck(HealthCheck.INSTANCE, tablet1, target); + TabletHealthCheck tabletHealthCheck2 = new TabletHealthCheck(HealthCheck.INSTANCE, tablet2, target); + TabletHealthCheck tabletHealthCheck3 = new TabletHealthCheck(HealthCheck.INSTANCE, tablet3, target); + tabletHealthCheckList.add(tabletHealthCheck1); + tabletHealthCheckList.add(tabletHealthCheck2); + tabletHealthCheckList.add(tabletHealthCheck3); + Map> healthy = new ConcurrentHashMap<>(16); + healthy.put(HealthCheck.keyFromTarget(target), tabletHealthCheckList); + + Map> treeMap = new TreeMap<>(healthy); + + int count = 1000000; + executorService.execute(() -> { + for (int i = 0; i < count; i++) { + HealthyCollector.stateHealthyChecksum(treeMap); + } + }); + for (int i = 0; i < count; i++) { + mockGetHealthyTabletStats(healthy, target); + } + + Thread.sleep(2000); + } + + @Test + public void testHealthyConcurrentModificationException() throws InterruptedException, IOException { + HealthCheck hc = getHealthCheck(); + + String keyspace = "k"; + String shard = "s"; + // add replica tablet + MockTablet mockTablet0 = buildMockTablet("cell", 1, "a", keyspace, shard, portMap, Topodata.TabletType.REPLICA); + hc.addTablet(mockTablet0.getTablet()); + // add replica tablet + MockTablet mockTablet1 = buildMockTablet("cell", 100, "b", keyspace, shard, portMap, Topodata.TabletType.REPLICA); + hc.addTablet(mockTablet1.getTablet()); + // add replica tablet + MockTablet mockTablet2 = buildMockTablet("cell", 10, "c", keyspace, shard, portMap, Topodata.TabletType.REPLICA); + hc.addTablet(mockTablet2.getTablet()); + + Thread.sleep(200); + Assert.assertEquals("Wrong Tablet data", 3, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 0, hc.getHealthyCopy().size()); + + printComment("c. Modify the status of Tablet to serving"); + sendOnNextMessage(mockTablet0, Topodata.TabletType.REPLICA, true, 0, 0.5, 0); + sendOnNextMessage(mockTablet1, Topodata.TabletType.REPLICA, true, 0, 0.5, 0); + sendOnNextMessage(mockTablet2, Topodata.TabletType.REPLICA, true, 0, 0.5, 0); + + Thread.sleep(200); + + Assert.assertEquals("Wrong Tablet data", 3, hc.getHealthByAliasCopy().size()); + Assert.assertEquals("Wrong Healthy Tablet data", 1, hc.getHealthyCopy().size()); + List healthyTabletStats = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); + Assert.assertNotNull(healthyTabletStats); + Assert.assertEquals("Wrong Tablet data", 3, healthyTabletStats.size()); + + int count = 1000000; + Map> healthy = hc.getHealthyCopy(); + executorService.execute(() -> { + for (int i = 0; i < count; i++) { + HealthyCollector.stateHealthyChecksum(healthy); + } + }); + for (int i = 0; i < count; i++) { + healthyTabletStats = hc.getHealthyTabletStats(createTarget(Topodata.TabletType.REPLICA)); + Assert.assertNotNull(healthyTabletStats); + Assert.assertEquals("Wrong Tablet data", 3, healthyTabletStats.size()); + } + + Thread.sleep(2000); + } + + private List mockGetHealthyTabletStats(Map> healthy, Query.Target target) { + List list = healthy.get(HealthCheck.keyFromTarget(target)); + if (null == list || list.isEmpty()) { + return null; + } + if (target.getTabletType() == Topodata.TabletType.MASTER) { + return list; + } + List servlist = new ArrayList<>(list.size()); + list.forEach(entry -> { + if (entry.getServing().get()) { + servlist.add(entry); + } + }); + return servlist; + } + private void startWatchTopo(String keyspaceName, TopoServer topoServer, String... cells) { for (String cell : cells) { TopologyWatcherManager.INSTANCE.startWatch(globalContext, topoServer, cell, keyspaceName); diff --git a/src/test/java/com/jd/jdbc/sqlparser/NormalizeRewritePutTest.java b/src/test/java/com/jd/jdbc/sqlparser/NormalizeRewritePutTest.java index 5ae8bb0..cd8a32a 100644 --- a/src/test/java/com/jd/jdbc/sqlparser/NormalizeRewritePutTest.java +++ b/src/test/java/com/jd/jdbc/sqlparser/NormalizeRewritePutTest.java @@ -24,7 +24,6 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.sql.SQLException; -import java.util.ArrayList; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List;