diff --git a/src/main/java/io/mycat/backend/mysql/nio/handler/FetchStoreNodeOfChildTableHandler.java b/src/main/java/io/mycat/backend/mysql/nio/handler/FetchStoreNodeOfChildTableHandler.java index 1a2f749c7..548cb22ab 100644 --- a/src/main/java/io/mycat/backend/mysql/nio/handler/FetchStoreNodeOfChildTableHandler.java +++ b/src/main/java/io/mycat/backend/mysql/nio/handler/FetchStoreNodeOfChildTableHandler.java @@ -23,6 +23,7 @@ */ package io.mycat.backend.mysql.nio.handler; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; @@ -32,13 +33,13 @@ import io.mycat.MycatServer; import io.mycat.backend.BackendConnection; -import io.mycat.backend.ConnectionMeta; import io.mycat.backend.datasource.PhysicalDBNode; import io.mycat.cache.CachePool; import io.mycat.config.MycatConfig; import io.mycat.net.mysql.ErrorPacket; import io.mycat.net.mysql.RowDataPacket; import io.mycat.route.RouteResultsetNode; +import io.mycat.server.ServerConnection; import io.mycat.server.parser.ServerParse; /** @@ -56,6 +57,67 @@ public class FetchStoreNodeOfChildTableHandler implements ResponseHandler { private volatile String dataNode; private AtomicInteger finished = new AtomicInteger(0); protected final ReentrantLock lock = new ReentrantLock(); + + public String execute(String schema, String sql, List dataNodes, ServerConnection sc) { + + String key = schema + ":" + sql; + CachePool cache = MycatServer.getInstance().getCacheService() + .getCachePool("ER_SQL2PARENTID"); + String result = (String) cache.get(key); + if (result != null) { + return result; + } + this.sql = sql; + int totalCount = dataNodes.size(); + long startTime = System.currentTimeMillis(); + long endTime = startTime + 5 * 60 * 1000L; + MycatConfig conf = MycatServer.getInstance().getConfig(); + + LOGGER.debug("find child node with sql:" + sql); + for (String dn : dataNodes) { + if (dataNode != null) { + return dataNode; + } + PhysicalDBNode mysqlDN = conf.getDataNodes().get(dn); + try { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("execute in datanode " + dn); + } + RouteResultsetNode node = new RouteResultsetNode(dn, ServerParse.SELECT, sql); + node.setRunOnSlave(false); // 获取 子表节点,最好走master为好 + + /* + * fix #1370 默认应该先从已经持有的连接中取连接, 否则可能因为事务隔离性看不到当前事务内更新的数据 + * Tips: 通过mysqlDN.getConnection获取到的连接不是当前连接 + * + */ + BackendConnection conn = sc.getSession2().getTarget(node); + if(sc.getSession2().tryExistsCon(conn, node)) { + _execute(conn, node, sc); + } else { + mysqlDN.getConnection(mysqlDN.getDatabase(), sc.isAutocommit(), node, this, node); + } + } catch (Exception e) { + LOGGER.warn("get connection err " + e); + } + } + + while (dataNode == null && System.currentTimeMillis() < endTime) { + try { + Thread.sleep(50); + } catch (InterruptedException e) { + break; + } + if (dataNode != null || finished.get() >= totalCount) { + break; + } + } + if (dataNode != null) { + cache.putIfAbsent(key, dataNode); + } + return dataNode; + + } public String execute(String schema, String sql, ArrayList dataNodes) { String key = schema + ":" + sql; @@ -84,7 +146,7 @@ public String execute(String schema, String sql, ArrayList dataNodes) { RouteResultsetNode node = new RouteResultsetNode(dn, ServerParse.SELECT, sql); node.setRunOnSlave(false); // 获取 子表节点,最好走master为好 - mysqlDN.getConnection(mysqlDN.getDatabase(), true, node, this, dn); + mysqlDN.getConnection(mysqlDN.getDatabase(), true, node, this, node); // mysqlDN.getConnection(mysqlDN.getDatabase(), true, // new RouteResultsetNode(dn, ServerParse.SELECT, sql), @@ -115,6 +177,15 @@ public String execute(String schema, String sql, ArrayList dataNodes) { return dataNode; } + + private void _execute(BackendConnection conn, RouteResultsetNode node, ServerConnection sc) { + conn.setResponseHandler(this); + try { + conn.execute(node, sc, sc.isAutocommit()); + } catch (IOException e) { + connectionError(e, conn); + } + } @Override public void connectionAcquired(BackendConnection conn) { @@ -162,7 +233,7 @@ public void rowResponse(byte[] row, BackendConnection conn) { } if (result == null) { result = getColumn(row); - dataNode = (String) conn.getAttachment(); + dataNode = ((RouteResultsetNode) conn.getAttachment()).getName(); } else { LOGGER.warn("find multi data nodes for child table store, sql is: " + sql); diff --git a/src/main/java/io/mycat/route/util/RouterUtil.java b/src/main/java/io/mycat/route/util/RouterUtil.java index a20ab4dec..c6f3ab99d 100644 --- a/src/main/java/io/mycat/route/util/RouterUtil.java +++ b/src/main/java/io/mycat/route/util/RouterUtil.java @@ -1592,7 +1592,8 @@ public static boolean processERChildTable(final SchemaConfig schema, final Strin @Override public String call() throws Exception { FetchStoreNodeOfChildTableHandler fetchHandler = new FetchStoreNodeOfChildTableHandler(); - return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes()); +// return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes()); + return fetchHandler.execute(schema.getName(), findRootTBSql, tc.getRootParent().getDataNodes(), sc); } }); @@ -1605,6 +1606,9 @@ public void onSuccess(String result) { StringBuilder s = new StringBuilder(); LOGGER.warn(s.append(sc.getSession2()).append(origSQL).toString() + " err:" + "can't find (root) parent sharding node for sql:" + origSQL); + if(!sc.isAutocommit()) { // 处于事务下失败, 必须回滚 + sc.setTxInterrupt("can't find (root) parent sharding node for sql:" + origSQL); + } sc.writeErrMessage(ErrorCode.ER_PARSE_ERROR, "can't find (root) parent sharding node for sql:" + origSQL); return; }