diff --git a/src/main/java/com/jd/jdbc/common/util/CollectionUtils.java b/src/main/java/com/jd/jdbc/common/util/CollectionUtils.java index e9231e9..178b233 100644 --- a/src/main/java/com/jd/jdbc/common/util/CollectionUtils.java +++ b/src/main/java/com/jd/jdbc/common/util/CollectionUtils.java @@ -47,4 +47,5 @@ public static boolean isNotEmpty(Map map) { public static boolean isNotEmpty(Object[] array) { return array != null && array.length > 0; } + } diff --git a/src/main/java/com/jd/jdbc/common/util/Crc32Utill.java b/src/main/java/com/jd/jdbc/common/util/Crc32Utill.java new file mode 100644 index 0000000..bcf677b --- /dev/null +++ b/src/main/java/com/jd/jdbc/common/util/Crc32Utill.java @@ -0,0 +1,38 @@ +/* +Copyright 2021 JD Project Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package com.jd.jdbc.common.util; + +import java.util.zip.CRC32; + +public class Crc32Utill { + + private Crc32Utill() { + } + + public static long checksumByCrc32(byte[] b) { + CRC32 crc32 = new CRC32(); + crc32.update(b); + return crc32.getValue(); + } + + public static long checksumByCrc32(int b) { + CRC32 crc32 = new CRC32(); + crc32.update(b); + return crc32.getValue(); + } + +} diff --git a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java index 01417b4..3a57e7d 100644 --- a/src/main/java/com/jd/jdbc/discovery/HealthCheck.java +++ b/src/main/java/com/jd/jdbc/discovery/HealthCheck.java @@ -41,6 +41,7 @@ import java.util.Objects; import java.util.Timer; import java.util.TimerTask; +import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicInteger; @@ -131,7 +132,7 @@ public Map getHealthByAliasCopy() { } public Map> getHealthyCopy() { - return new HashMap<>(healthy); + return new TreeMap<>(healthy); } public IQueryService tabletConnection(Topodata.TabletAlias alias) { diff --git a/src/main/java/com/jd/jdbc/monitor/HealthyCollector.java b/src/main/java/com/jd/jdbc/monitor/HealthyCollector.java index 57f0b68..81dcca4 100644 --- a/src/main/java/com/jd/jdbc/monitor/HealthyCollector.java +++ b/src/main/java/com/jd/jdbc/monitor/HealthyCollector.java @@ -16,11 +16,14 @@ package com.jd.jdbc.monitor; +import com.google.common.collect.Lists; +import com.jd.jdbc.common.util.Crc32Utill; import com.jd.jdbc.discovery.HealthCheck; import com.jd.jdbc.discovery.TabletHealthCheck; import io.prometheus.client.Collector; import io.prometheus.client.GaugeMetricFamily; -import java.util.Collections; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Map; @@ -31,6 +34,12 @@ public final class HealthyCollector extends Collector { private static final HealthyCollector HEALTH_CHECK_COLLECTOR = new HealthyCollector(); + private static final List LABEL_NAME_HEALTHY = Lists.newArrayList("HealthyChecksum"); + + private static final String COLLECT_NAME_HEALTHY = "HealthyChecksum"; + + private static final String COLLECT_HELP_HEALTHY = "crc32 checksum of the current healthCheck.healthy state "; + private HealthyCollector() { } @@ -55,6 +64,54 @@ public List collect() { HealthCheckCollector.buildGaugeMetric(labeledGauge, notServing, tabletHealthCheck); } } - return Collections.singletonList(labeledGauge); + + GaugeMetricFamily labeledGaugeCheckSum = collectChecksum(healthyCopy); + + List ret = new ArrayList<>(); + ret.add(labeledGauge); + ret.add(labeledGaugeCheckSum); + return ret; + } + + public GaugeMetricFamily collectChecksum(Map> healthy) { + GaugeMetricFamily labeledGaugeSum = new GaugeMetricFamily(COLLECT_NAME_HEALTHY, COLLECT_HELP_HEALTHY, LABEL_NAME_HEALTHY); + + long crc32Val = stateHealthyChecksum(healthy); + List healthyLV = Lists.newArrayList(Long.toString(crc32Val)); + labeledGaugeSum.addMetric(healthyLV, crc32Val); + return labeledGaugeSum; + } + + public static long stateHealthyChecksum(Map> healthy) { + StringBuilder sb = new StringBuilder(); + + for (List tabletHealthCheckList : healthy.values()) { + tabletHealthCheckList.sort(new Comparator() { + @Override + public int compare(TabletHealthCheck o1, TabletHealthCheck o2) { + return Long.compare(o1.getTablet().getAlias().getUid(), o2.getTablet().getAlias().getUid()); + } + }); + + for (TabletHealthCheck tabletHealthCheck : tabletHealthCheckList) { + if (!tabletHealthCheck.getServing().get()) { + // ignore noserving + continue; + } + sb.append(tabletHealthCheck.getTarget().getCell()); + sb.append(tabletHealthCheck.getTarget().getKeyspace()); + sb.append(tabletHealthCheck.getTarget().getShard()); + sb.append(tabletHealthCheck.getTarget().getTabletType()); + sb.append("\n"); + sb.append(tabletHealthCheck.getTablet().getAlias()); + sb.append(tabletHealthCheck.getTablet().getHostname()); + sb.append(tabletHealthCheck.getTablet().getMysqlPort()); + sb.append("\n"); + sb.append(tabletHealthCheck.getMasterTermStartTime()); + sb.append("\n"); + } + } + return Crc32Utill.checksumByCrc32(sb.toString().getBytes()); } + } diff --git a/src/main/java/com/jd/jdbc/srvtopo/ResilientServer.java b/src/main/java/com/jd/jdbc/srvtopo/ResilientServer.java index df2d665..fd6edb8 100644 --- a/src/main/java/com/jd/jdbc/srvtopo/ResilientServer.java +++ b/src/main/java/com/jd/jdbc/srvtopo/ResilientServer.java @@ -19,13 +19,13 @@ package com.jd.jdbc.srvtopo; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.jd.jdbc.common.util.Crc32Utill; import com.jd.jdbc.context.IContext; import com.jd.jdbc.context.VtContext; import com.jd.jdbc.key.CurrentShard; import com.jd.jdbc.monitor.SrvKeyspaceCollector; import com.jd.jdbc.sqlparser.support.logging.Log; import com.jd.jdbc.sqlparser.support.logging.LogFactory; -import com.jd.jdbc.sqlparser.utils.Utils; import com.jd.jdbc.topo.Topo; import com.jd.jdbc.topo.TopoException; import com.jd.jdbc.topo.TopoExceptionCode; @@ -93,12 +93,19 @@ public class ResilientServer implements SrvTopoServer { public List getSrvKeyspaceCollectorInfo() { List infoList = new ArrayList<>(); Map map = new HashMap<>(srvKeyspaceCache); + for (Map.Entry entry : map.entrySet()) { String keyspaceCell = entry.getKey(); String[] split = keyspaceCell.split("\\."); Topodata.SrvKeyspace srvKeyspace = entry.getValue().value; - String md5 = srvKeyspace == null ? "" : Utils.md5(srvKeyspace.toString()); - SrvKeyspaceCollector.Info info = new SrvKeyspaceCollector.Info(md5, split[0], split[1]); + long infoCrc32; + + if (srvKeyspace == null) { + infoCrc32 = 0; + } else { + infoCrc32 = Crc32Utill.checksumByCrc32(srvKeyspace.toString().getBytes()); + } + SrvKeyspaceCollector.Info info = new SrvKeyspaceCollector.Info(Long.toString(infoCrc32), split[0], split[1]); infoList.add(info); } return infoList; diff --git a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java index 02f6980..766f5e4 100644 --- a/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java +++ b/src/test/java/com/jd/jdbc/discovery/HealthCheckTest.java @@ -21,6 +21,7 @@ import com.jd.jdbc.context.IContext; import com.jd.jdbc.context.VtContext; import com.jd.jdbc.discovery.TabletHealthCheck.TabletStreamHealthStatus; +import com.jd.jdbc.monitor.HealthyCollector; import com.jd.jdbc.queryservice.CombinedQueryService; import com.jd.jdbc.queryservice.IParentQueryService; import com.jd.jdbc.queryservice.MockQueryServer; @@ -40,6 +41,7 @@ import io.vitess.proto.Query; import io.vitess.proto.Topodata; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -839,6 +841,70 @@ public void testMysqlPort3358to0() throws IOException, InterruptedException { printOk(); } + @Test + public void testHealthyListChecksum() { + HealthCheck hc = getHealthCheck(); + Topodata.Tablet tablet1 = buildTablet("cella", 7, "1.1.1.7", "k", "s", portMap, Topodata.TabletType.REPLICA); + Topodata.Tablet tablet2 = buildTablet("cella", 8, "1.1.1.8", "k", "s", portMap, Topodata.TabletType.REPLICA); + Query.Target target = Query.Target.newBuilder().setKeyspace(tablet1.getKeyspace()).setShard(tablet1.getShard()).setTabletType(tablet1.getType()).build(); + + + Map> healthy1 = hc.getHealthyCopy(); + List healthyMap1 = new ArrayList<>(); + + TabletHealthCheck thc1 = new TabletHealthCheck(null, tablet1, target); + thc1.getServing().set(true); + TabletHealthCheck thc2 = new TabletHealthCheck(null, tablet2, target); + thc2.getServing().set(true); + + healthyMap1.add(thc1); + healthyMap1.add(thc2); + healthy1.put("k1", healthyMap1); + + Map> healthy2 = hc.getHealthyCopy(); + List healthyMap2 = new ArrayList<>(); + healthyMap2.add(thc2); + healthyMap2.add(thc1); + healthy2.put("k1", healthyMap2); + + long healthy1Crc32 = HealthyCollector.stateHealthyChecksum(healthy1); + long healthy2Crc32 = HealthyCollector.stateHealthyChecksum(healthy2); + Assert.assertEquals("Wrong HealthyChecksum", healthy1Crc32, healthy2Crc32); + } + + @Test + public void testHealthyChecksumSetBehindMaster() throws IOException, InterruptedException { + HealthCheck hc = getHealthCheck(); + // add tablet + String keyInHealthy = "k.s.replica"; + MockTablet mockTablet1 = buildMockTablet("cella", 7, "1.1.1.7", "k", "s", portMap, Topodata.TabletType.REPLICA); + MockTablet mockTablet2 = buildMockTablet("cella", 8, "1.1.1.8", "k", "s", portMap, Topodata.TabletType.REPLICA); + + hc.addTablet(mockTablet1.getTablet()); + hc.addTablet(mockTablet2.getTablet()); + Thread.sleep(200); + sendOnNextMessage(mockTablet1, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); + sendOnNextMessage(mockTablet2, Topodata.TabletType.REPLICA, true, 0, 0.5, 2); + Thread.sleep(200); + + // sort list in healthy order by secondsBehindMaster + hc.recomputeHealthyLocked(keyInHealthy); + long firstCrc32 = HealthyCollector.stateHealthyChecksum(hc.getHealthyCopy()); + + sendOnNextMessage(mockTablet1, Topodata.TabletType.REPLICA, true, 0, 0.5, 2); + sendOnNextMessage(mockTablet2, Topodata.TabletType.REPLICA, true, 0, 0.5, 1); + Thread.sleep(200); + + // sort list in healthy order by secondsBehindMaster + hc.recomputeHealthyLocked(keyInHealthy); + long secondCrc32 = HealthyCollector.stateHealthyChecksum(hc.getHealthyCopy()); + + Assert.assertNotEquals(hc.getHealthyCopy().get(keyInHealthy).get(0).getTablet().getHostname(), hc.getHealthyCopy().get(keyInHealthy).get(1).getTablet().getHostname()); + Assert.assertEquals("Wrong HealthyChecksum", firstCrc32, secondCrc32); + + closeQueryService(mockTablet1, mockTablet2); + } + private void startWatchTopo(String keyspaceName, TopoServer topoServer, String... cells) { for (String cell : cells) { TopologyWatcherManager.INSTANCE.startWatch(globalContext, topoServer, cell, keyspaceName);