diff --git a/src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java b/src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java index be58d52..70e5200 100644 --- a/src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java +++ b/src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java @@ -42,6 +42,7 @@ import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -90,6 +91,9 @@ private void loadTablets(IContext ctx) { private void connectTablets(Map newTablets) { if (CollectionUtils.isEmpty(newTablets)) { + for (Map.Entry entry : currentTablets.entrySet()) { + hc.removeTablet(entry.getValue()); + } return; } this.lock.lock(); @@ -119,24 +123,11 @@ private void connectTablets(Map newTablets) { } } - private Map getTopoTabletInfoMap(IContext ctx) { + private Map getTopoTabletInfoMap(IContext ctx) throws TopoException, InterruptedException { Map newTablets; if (firstLoadTabletsFlag) { - List tabletList; - try { - tabletList = ts.getTabletsByRange(ctx, cell); - } catch (TopoException e) { - if (TopoException.isErrType(e, TopoExceptionCode.NO_NODE)) { - log.warn("getTabletsByRange error,cause by" + e.getMessage()); - return null; - } - log.error("get topoTabletInfo fail", e); - // Exception will be thrown when tablet does not exist, so ignore and continue. - // Avoid network abnormalities that cause a large amount of Tablet metadata to be deleted from memory - TopologyCollector.getErrorCounter().labels(cell).inc(); - return null; - } - newTablets = new HashMap<>(); + List tabletList = ts.getTabletsByRange(ctx, cell); + newTablets = new HashMap<>(16); for (Topodata.Tablet tablet : tabletList) { if (StringUtils.isEmpty(tablet.getKeyspace())) { continue; @@ -151,18 +142,7 @@ private Map getTopoTabletInfoMap(IContext ctx) { return newTablets; } - List tablets; - try { - tablets = ts.getTabletAliasByCell(ctx, cell); - } catch (TopoException e) { - if (TopoException.isErrType(e, TopoExceptionCode.NO_NODE)) { - log.warn("getTabletAliasByCell error,cause by" + e.getMessage()); - return null; - } - log.error("Build Tablets fail,current ksSet=" + ksSet, e); - TopologyCollector.getErrorCounter().labels(cell).inc(); - return null; - } + List tablets = ts.getTabletAliasByCell(ctx, cell); if (CollectionUtils.isEmpty(tablets)) { return null; } @@ -211,13 +191,7 @@ private Map getTopoTabletInfoMap(IContext ctx) { } }); } - try { - countDownLatch.await(); - } catch (InterruptedException e) { - log.error(e.getMessage(), e); - TopologyCollector.getErrorCounter().labels(cell).inc(); - } - + countDownLatch.await(10, TimeUnit.SECONDS); return newTablets; } diff --git a/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java b/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java index d3acda3..20b9377 100644 --- a/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java +++ b/src/main/java/com/jd/jdbc/topo/etcd2topo/Etcd2TopoServer.java @@ -53,7 +53,7 @@ public class Etcd2TopoServer implements TopoConnection { private static final String END_TAG_OF_RANGE_SEARCH = "1"; - private static final int DEFALUT_TIMEOUT = 3; + private static final long DEFALUT_TIMEOUT = 10L; private static final ConcurrentMap WATCHER_MAP = new ConcurrentHashMap<>(16); diff --git a/src/test/java/com/jd/jdbc/topo/MemoryTopoFactory.java b/src/test/java/com/jd/jdbc/topo/MemoryTopoFactory.java index 350d42c..0b64b35 100644 --- a/src/test/java/com/jd/jdbc/topo/MemoryTopoFactory.java +++ b/src/test/java/com/jd/jdbc/topo/MemoryTopoFactory.java @@ -27,20 +27,20 @@ import org.apache.commons.lang3.RandomUtils; @Getter -public class MemoryTopoFactory implements TopoFactory{ +public class MemoryTopoFactory implements TopoFactory { + + private static final String GLOBAL_CELL = "global"; private final Map cells; private long generation; - private final static String GLOBAL_CELL = "global"; - private MemoryTopoFactory() { cells = new ConcurrentHashMap<>(); generation = RandomUtils.nextLong(); } - public static ServerWithFactory newServerAndFactory(String...cells) throws TopoException { + public static ServerWithFactory newServerAndFactory(String... cells) throws TopoException { MemoryTopoFactory factory = new MemoryTopoFactory(); factory.getCells().put(GLOBAL_CELL, factory.newDirectory(GLOBAL_CELL, null)); TopoServer topoServer = Topo.newWithFactory(factory, "", ""); @@ -113,8 +113,6 @@ public MemoryTopoServer.Node getOrCreatePath(String cell, String filePath) { return node; } - - @Override public Boolean hasGlobalReadOnlyCell(String name, String id) { return false;