Skip to content

Commit

Permalink
Fix the issue of healthByAlias containing information about VTTablets…
Browse files Browse the repository at this point in the history
… that are already offline. (#116)
  • Loading branch information
wangweicugw authored Aug 10, 2023
1 parent 325f53b commit f7577d1
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 42 deletions.
44 changes: 9 additions & 35 deletions src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -90,6 +91,9 @@ private void loadTablets(IContext ctx) {

private void connectTablets(Map<String, Topodata.Tablet> newTablets) {
if (CollectionUtils.isEmpty(newTablets)) {
for (Map.Entry<String, Topodata.Tablet> entry : currentTablets.entrySet()) {
hc.removeTablet(entry.getValue());
}
return;
}
this.lock.lock();
Expand Down Expand Up @@ -119,24 +123,11 @@ private void connectTablets(Map<String, Topodata.Tablet> newTablets) {
}
}

private Map<String, Topodata.Tablet> getTopoTabletInfoMap(IContext ctx) {
private Map<String, Topodata.Tablet> getTopoTabletInfoMap(IContext ctx) throws TopoException, InterruptedException {
Map<String, Topodata.Tablet> newTablets;
if (firstLoadTabletsFlag) {
List<Topodata.Tablet> tabletList;
try {
tabletList = ts.getTabletsByRange(ctx, cell);
} catch (TopoException e) {
if (TopoException.isErrType(e, TopoExceptionCode.NO_NODE)) {
log.warn("getTabletsByRange error,cause by" + e.getMessage());
return null;
}
log.error("get topoTabletInfo fail", e);
// Exception will be thrown when tablet does not exist, so ignore and continue.
// Avoid network abnormalities that cause a large amount of Tablet metadata to be deleted from memory
TopologyCollector.getErrorCounter().labels(cell).inc();
return null;
}
newTablets = new HashMap<>();
List<Topodata.Tablet> tabletList = ts.getTabletsByRange(ctx, cell);
newTablets = new HashMap<>(16);
for (Topodata.Tablet tablet : tabletList) {
if (StringUtils.isEmpty(tablet.getKeyspace())) {
continue;
Expand All @@ -151,18 +142,7 @@ private Map<String, Topodata.Tablet> getTopoTabletInfoMap(IContext ctx) {
return newTablets;
}

List<Topodata.TabletAlias> tablets;
try {
tablets = ts.getTabletAliasByCell(ctx, cell);
} catch (TopoException e) {
if (TopoException.isErrType(e, TopoExceptionCode.NO_NODE)) {
log.warn("getTabletAliasByCell error,cause by" + e.getMessage());
return null;
}
log.error("Build Tablets fail,current ksSet=" + ksSet, e);
TopologyCollector.getErrorCounter().labels(cell).inc();
return null;
}
List<Topodata.TabletAlias> tablets = ts.getTabletAliasByCell(ctx, cell);
if (CollectionUtils.isEmpty(tablets)) {
return null;
}
Expand Down Expand Up @@ -211,13 +191,7 @@ private Map<String, Topodata.Tablet> getTopoTabletInfoMap(IContext ctx) {
}
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
TopologyCollector.getErrorCounter().labels(cell).inc();
}

countDownLatch.await(10, TimeUnit.SECONDS);
return newTablets;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class Etcd2TopoServer implements TopoConnection {

private static final String END_TAG_OF_RANGE_SEARCH = "1";

private static final int DEFALUT_TIMEOUT = 3;
private static final long DEFALUT_TIMEOUT = 10L;

private static final ConcurrentMap<String, Watch.Watcher> WATCHER_MAP = new ConcurrentHashMap<>(16);

Expand Down
10 changes: 4 additions & 6 deletions src/test/java/com/jd/jdbc/topo/MemoryTopoFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@
import org.apache.commons.lang3.RandomUtils;

@Getter
public class MemoryTopoFactory implements TopoFactory{
public class MemoryTopoFactory implements TopoFactory {

private static final String GLOBAL_CELL = "global";

private final Map<String, MemoryTopoServer.Node> cells;

private long generation;

private final static String GLOBAL_CELL = "global";

private MemoryTopoFactory() {
cells = new ConcurrentHashMap<>();
generation = RandomUtils.nextLong();
}

public static ServerWithFactory newServerAndFactory(String...cells) throws TopoException {
public static ServerWithFactory newServerAndFactory(String... cells) throws TopoException {
MemoryTopoFactory factory = new MemoryTopoFactory();
factory.getCells().put(GLOBAL_CELL, factory.newDirectory(GLOBAL_CELL, null));
TopoServer topoServer = Topo.newWithFactory(factory, "", "");
Expand Down Expand Up @@ -113,8 +113,6 @@ public MemoryTopoServer.Node getOrCreatePath(String cell, String filePath) {
return node;
}



@Override
public Boolean hasGlobalReadOnlyCell(String name, String id) {
return false;
Expand Down

0 comments on commit f7577d1

Please sign in to comment.