Skip to content

Commit

Permalink
The cell parameter is omitted from the connection database URL (#132)
Browse files Browse the repository at this point in the history
* The cell parameter is omitted from the connection database URL
  • Loading branch information
wlx5575 authored Aug 24, 2023
1 parent 964062c commit e9d5536
Show file tree
Hide file tree
Showing 22 changed files with 587 additions and 262 deletions.
2 changes: 1 addition & 1 deletion src/main/java/com/jd/jdbc/discovery/HealthCheck.java
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ public void removeTablet(Topodata.Tablet tablet) {
try {
thc = this.healthByAlias.get(tabletAlias);
if (thc == null) {
log.error("we have no health data for tablet " + TopoProto.tabletToHumanString(tablet) + ", it might have been delete already");
log.info("we have no health data for tablet " + TopoProto.tabletToHumanString(tablet) + ", it might have been delete already");
return;
}

Expand Down
44 changes: 36 additions & 8 deletions src/main/java/com/jd/jdbc/discovery/TopologyWatcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,26 +78,40 @@ public TopologyWatcher(TopoServer ts, String cell, String tabletKeyspace) {
log.info("start topo watcher for cell: " + cell);
}

public TopologyWatcher(TopoServer ts, String cell, Set<String> tabletKeyspaces) {
this.ts = ts;
this.hc = HealthCheck.INSTANCE;
this.cell = cell;
this.ksSet.addAll(tabletKeyspaces);
this.firstLoadTabletsFlag = true;
log.info("start topo watcher for cell: " + cell);
}

private void loadTablets(IContext ctx) {
try {
Map<String, Topodata.Tablet> newTablets = getTopoTabletInfoMap(ctx);
connectTablets(newTablets);
TopologyCollector.getCounter().labels(cell).inc();
} catch (Throwable t) {
TopologyCollector.getErrorCounter().labels(cell).inc();
log.error("Unexpected error occur at loadTablets, cause: " + t.getMessage(), t);
log.error("Unexpected Throwable error occur at loadTablets, cause: " + t.getMessage(), t);
}
}

private void connectTablets(Map<String, Topodata.Tablet> newTablets) {
if (CollectionUtils.isEmpty(newTablets)) {
for (Map.Entry<String, Topodata.Tablet> entry : currentTablets.entrySet()) {
hc.removeTablet(entry.getValue());
}
return;
}
this.lock.lock();
try {
if (CollectionUtils.isEmpty(newTablets)) {
if (currentTablets == null || currentTablets.isEmpty()) {
return;
}
for (Map.Entry<String, Topodata.Tablet> entry : currentTablets.entrySet()) {
hc.removeTablet(entry.getValue());
}
currentTablets = new ConcurrentHashMap<>();
return;
}

for (Map.Entry<String, Topodata.Tablet> entry : newTablets.entrySet()) {
Topodata.Tablet newTablet = entry.getValue();
Topodata.Tablet oldTablet = currentTablets.get(entry.getKey());
Expand Down Expand Up @@ -208,8 +222,19 @@ public void watchKeyspace(IContext ctx, String tabletKeyspace) {
if (ksSet.contains(tabletKeyspace)) {
return;
}

this.lock.lock();
try {
if (ksSet.contains(tabletKeyspace)) {
return;
}
this.ksSet.add(tabletKeyspace);
} finally {
this.lock.unlock();
}

log.info("topo watcher for cell " + cell + " watches: " + tabletKeyspace);
this.ksSet.add(tabletKeyspace);

Set<Topodata.Tablet> tablets = this.ignoreTopo.watchKs(tabletKeyspace);
if (CollectionUtils.isEmpty(tablets)) {
return;
Expand All @@ -229,6 +254,9 @@ public void watchKeyspace(IContext ctx, String tabletKeyspace) {

public void close() {
timer.cancel();
if (currentTablets == null) {
return;
}
for (Map.Entry<String, Topodata.Tablet> entry : currentTablets.entrySet()) {
hc.removeTablet(entry.getValue());
}
Expand Down
90 changes: 89 additions & 1 deletion src/main/java/com/jd/jdbc/discovery/TopologyWatcherManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,22 @@

package com.jd.jdbc.discovery;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.jd.jdbc.common.util.CollectionUtils;
import com.jd.jdbc.context.IContext;
import com.jd.jdbc.sqlparser.support.logging.Log;
import com.jd.jdbc.sqlparser.support.logging.LogFactory;
import com.jd.jdbc.topo.TopoException;
import com.jd.jdbc.topo.TopoServer;
import com.jd.jdbc.util.threadpool.VtThreadFactoryBuilder;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -31,17 +43,36 @@ public enum TopologyWatcherManager {
*/
INSTANCE;

private Map<String, TopologyWatcher> cellTopologyWatcherMap = null;

private Map<String, Set<String>> globalKeyspacesMap = null;

private final Lock lock = new ReentrantLock();

private Map<String, TopologyWatcher> cellTopologyWatcherMap = null;
private static final Log LOGGER = LogFactory.getLog(TopologyWatcherManager.class);

private ScheduledThreadPoolExecutor scheduledExecutor;

TopologyWatcherManager() {
cellTopologyWatcherMap = new ConcurrentHashMap<>(16);
globalKeyspacesMap = new ConcurrentHashMap<>(16);

scheduledExecutor = new ScheduledThreadPoolExecutor(1, VtThreadFactoryBuilder.build("reload-cell-schedule"));
scheduledExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduledExecutor.setRemoveOnCancelPolicy(true);
}

public void startWatch(IContext ctx, TopoServer topoServer, String cell, String tabletKeyspace) {
lock.lock();
try {
String serverAddress = topoServer.getServerAddress();
if (!globalKeyspacesMap.containsKey(serverAddress)) {
globalKeyspacesMap.put(serverAddress, new HashSet<>());

startTickerReloadCell(ctx, topoServer, TimeUnit.MINUTES);
}
globalKeyspacesMap.get(serverAddress).add(tabletKeyspace);

if (!cellTopologyWatcherMap.containsKey(cell)) {
TopologyWatcher topologyWatcher = new TopologyWatcher(topoServer, cell, tabletKeyspace);
topologyWatcher.start(ctx);
Expand All @@ -60,10 +91,67 @@ public void watch(IContext ctx, String cell, String tabletKeyspace) {
}

public void close() {
closeScheduledExecutor();

for (Map.Entry<String, TopologyWatcher> entry : cellTopologyWatcherMap.entrySet()) {
TopologyWatcher topologyWatcher = entry.getValue();
topologyWatcher.close();
}
cellTopologyWatcherMap.clear();
globalKeyspacesMap.clear();
}

public void resetScheduledExecutor() {
closeScheduledExecutor();

ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("reload-cell-schedule").setDaemon(true).build();
scheduledExecutor = new ScheduledThreadPoolExecutor(1, threadFactory);
}

public void closeScheduledExecutor() {
scheduledExecutor.shutdownNow();
try {
int tryAgain = 3;
while (tryAgain > 0 && !scheduledExecutor.awaitTermination(1, TimeUnit.SECONDS)) {
tryAgain--;
}
} catch (InterruptedException e) {
// We're shutting down anyway, so just ignore.
}
}

public boolean isWatching(String cell) {
return cellTopologyWatcherMap.containsKey(cell);
}

public void startTickerReloadCell(IContext globalContext, TopoServer topoServer, TimeUnit timeUnit) {
scheduledExecutor.scheduleWithFixedDelay(() -> {
try {
tickerUpdateCells(globalContext, topoServer);
} catch (Throwable e) {
LOGGER.error("tickerUpdateCells error: " + e);
}
}, 5, 10, timeUnit);
}

private void tickerUpdateCells(IContext globalContext, TopoServer topoServer) throws TopoException {
String serverAddress = topoServer.getServerAddress();
Set<String> keyspaceSet = globalKeyspacesMap.get(serverAddress);
if (CollectionUtils.isEmpty(keyspaceSet)) {
throw new RuntimeException("not found keyspace in " + serverAddress + " of TopologyWatcherManager.globalKeyspacesMap .");
}
List<String> allCells = topoServer.getAllCells(globalContext);
for (String cell : allCells) {
if (!isWatching(cell)) {
lock.lock();
try {
TopologyWatcher topologyWatcher = new TopologyWatcher(topoServer, cell, keyspaceSet);
topologyWatcher.start(globalContext);
cellTopologyWatcherMap.put(cell, topologyWatcher);
} finally {
lock.unlock();
}
}
}
}
}
14 changes: 8 additions & 6 deletions src/main/java/com/jd/jdbc/topo/Topo.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,13 @@

package com.jd.jdbc.topo;

import com.jd.jdbc.topo.etcd2topo.Etcd2TopoFactory;
import io.vitess.proto.Vtrpc;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static com.jd.jdbc.topo.TopoExceptionCode.NO_IMPLEMENTATION;
import static com.jd.jdbc.topo.TopoServer.CELLS_PATH;
import static com.jd.jdbc.topo.TopoServer.CELL_INFO_FILE;
Expand All @@ -28,12 +35,6 @@
import static com.jd.jdbc.topo.TopoServer.TABLETS_PATH;
import static com.jd.jdbc.topo.TopoServer.TABLET_FILE;
import static com.jd.jdbc.topo.TopoServer.VSCHEMA_FILE;
import com.jd.jdbc.topo.etcd2topo.Etcd2TopoFactory;
import io.vitess.proto.Vtrpc;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class Topo {

Expand Down Expand Up @@ -125,6 +126,7 @@ protected static TopoServer newWithFactory(TopoFactory topoFactory, String serve
topoServer.globalReadOnlyCell = connReadOnly;
topoServer.topoFactory = topoFactory;
topoServer.cells = new HashMap<>(16);
topoServer.serverAddress = serverAddress;
return topoServer;
}

Expand Down
51 changes: 45 additions & 6 deletions src/main/java/com/jd/jdbc/topo/TopoServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,9 @@
import com.google.protobuf.InvalidProtocolBufferException;
import com.jd.jdbc.context.IContext;
import com.jd.jdbc.key.CurrentShard;
import static com.jd.jdbc.topo.Topo.pathForCellInfo;
import static com.jd.jdbc.topo.Topo.pathForSrvKeyspaceFile;
import static com.jd.jdbc.topo.Topo.pathForTabletAlias;
import static com.jd.jdbc.topo.Topo.pathForVschemaFile;
import static com.jd.jdbc.topo.TopoConnection.ConnGetResponse;
import com.jd.jdbc.sqlparser.support.logging.Log;
import com.jd.jdbc.sqlparser.support.logging.LogFactory;
import com.jd.jdbc.topo.TopoConnection.DirEntry;
import static com.jd.jdbc.topo.TopoExceptionCode.NO_NODE;
import com.jd.jdbc.topo.topoproto.TopoProto;
import io.vitess.proto.Topodata;
import java.util.ArrayList;
Expand All @@ -41,8 +37,17 @@
import java.util.concurrent.locks.ReentrantLock;
import vschema.Vschema;

import static com.jd.jdbc.topo.Topo.pathForCellInfo;
import static com.jd.jdbc.topo.Topo.pathForSrvKeyspaceFile;
import static com.jd.jdbc.topo.Topo.pathForTabletAlias;
import static com.jd.jdbc.topo.Topo.pathForVschemaFile;
import static com.jd.jdbc.topo.TopoConnection.ConnGetResponse;
import static com.jd.jdbc.topo.TopoExceptionCode.NO_NODE;

public class TopoServer implements Resource, TopoCellInfo, TopoSrvKeyspace, TopoTablet, TopoVschema {

private static final Log log = LogFactory.getLog(TopoServer.class);

static final String GLOBAL_CELL = "global";

static final String GLOBAL_READ_ONLY_CELL = "global-read-only";
Expand Down Expand Up @@ -73,6 +78,8 @@ public class TopoServer implements Resource, TopoCellInfo, TopoSrvKeyspace, Topo

Map<String, TopoConnection> cells;

String serverAddress;

/**
*
*/
Expand Down Expand Up @@ -183,6 +190,33 @@ public static Map<String, Topodata.SrvKeyspace> getSrvkeyspaceMapCopy() {
return new HashMap<>(SRVKEYSPACE_MAP);
}

public List<String> getAllCells(IContext ctx) throws TopoException {
List<TopoConnection.DirEntry> dirEntryList = this.globalCell.listDir(ctx, CELLS_PATH, false, false);
List<String> cells = Topo.dirEntriesToStringArray(dirEntryList);
if (cells.size() < 1) {
throw TopoException.wrap("Cells Information missing");
}
return cells;
}

public String getLocalCell(IContext globalContext, TopoServer topoServer, List<String> cells, String defaultKeyspace) throws TopoException {
String localCell = cells.get(0);
for (String cell : cells) {
try {
Topodata.SrvKeyspace getSrvKeyspace = topoServer.getSrvKeyspace(globalContext, cell, defaultKeyspace);
if (getSrvKeyspace != null) {
localCell = cell;
return localCell;
}
} catch (TopoException e) {
if (e.getCode() != TopoExceptionCode.NO_NODE) {
throw TopoException.wrap(e.getMessage());
}
}
}
return localCell;
}

/**
* @param ctx
* @param cell
Expand Down Expand Up @@ -309,6 +343,7 @@ public List<Topodata.TabletAlias> getTabletAliasByCell(IContext ctx, String cell
return result;
} catch (TopoException e) {
if (TopoException.isErrType(e, NO_NODE)) {
log.debug("failed to get tablet in cell " + cell + " . error: " + e);
return null;
}
throw e;
Expand Down Expand Up @@ -352,4 +387,8 @@ public Vschema.Keyspace getVschema(IContext ctx, String keyspaceName) throws Top
}
return keyspace;
}

public String getServerAddress() {
return this.serverAddress;
}
}
2 changes: 1 addition & 1 deletion src/main/java/com/jd/jdbc/topo/TopoStatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public CompletableFuture<List<DirEntry>> listDirFuture(IContext ctx, String dirP

@Override
public Version create(IContext ctx, String filePath, byte[] contents) throws TopoException {
return null;
return this.topoConnection.create(ctx, filePath, contents);
}

@Override
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/com/jd/jdbc/util/KeyspaceUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@

package com.jd.jdbc.util;

import static com.jd.jdbc.common.Constant.DEFAULT_DATABASE_PREFIX;
import io.netty.util.internal.StringUtil;

import static com.jd.jdbc.common.Constant.DEFAULT_DATABASE_PREFIX;

public class KeyspaceUtil {
public static String getLogicSchema(String tableCat) {
if (StringUtil.isNullOrEmpty(tableCat)) {
Expand All @@ -37,4 +38,8 @@ public static String getRealSchema(String tableCat) {
}
return DEFAULT_DATABASE_PREFIX + tableCat;
}

public static String getTabletKeyspace(String keyspace) {
return keyspace;
}
}
Loading

0 comments on commit e9d5536

Please sign in to comment.