diff --git a/source/src/main/java/io/mycat/mycat2/ConfigLoader.java b/source/src/main/java/io/mycat/mycat2/ConfigLoader.java index ce61f1b..e756edb 100644 --- a/source/src/main/java/io/mycat/mycat2/ConfigLoader.java +++ b/source/src/main/java/io/mycat/mycat2/ConfigLoader.java @@ -135,13 +135,24 @@ public static List loadMySQLRepBean(String datasourceuri){ repBean.setSwitchType(switchType); List mysqlNodes=getChildNodes(curRepNode,"mysql"); List allMysqls=mysqlNodes.stream().map(mysqlNode->{ - NamedNodeMap attrs = mysqlNode.getAttributes(); - String ip=getAttribute(attrs,"ip",null); - String user=getAttribute(attrs,"user",null); - String password=getAttribute(attrs,"password",null); - int port=getIntAttribute(attrs,"port",3306); - MySQLBean mysql=new MySQLBean(ip,port,user,password); - return mysql;}).collect(Collectors.toList()) ; + NamedNodeMap attrs = mysqlNode.getAttributes(); + String hostName = getAttribute(attrs, "hostname", null); + String ip=getAttribute(attrs,"ip",null); + String user=getAttribute(attrs,"user",null); + String password=getAttribute(attrs,"password",null); + int port=getIntAttribute(attrs,"port",3306); + Integer minCon = getIntAttribute(attrs, "min-con", null); + Integer maxCon = getIntAttribute(attrs, "max-con", null); + + MySQLBean mysql=new MySQLBean(ip,port,user,password); + if (hostName != null) + mysql.setHostName(hostName); + if (minCon != null) + mysql.setMinCon(minCon); + if (maxCon != null) + mysql.setMaxCon(maxCon); + return mysql; + }).collect(Collectors.toList()) ; repBean.setMysqls(allMysqls); list.add(repBean); } @@ -161,7 +172,7 @@ private static String getAttribute(NamedNodeMap map,String attr,String defaultVa { return getValue(map.getNamedItem(attr),defaultVal); } - private static int getIntAttribute(NamedNodeMap map,String attr,int defaultVal) + private static Integer getIntAttribute(NamedNodeMap map,String attr,Integer defaultVal) { return getIntValue(map.getNamedItem(attr),defaultVal); } @@ -169,7 +180,7 @@ private static String getValue(Node node,String defaultVal) { return node==null?defaultVal:node.getNodeValue(); } - private static int getIntValue(Node node,int defaultVal) + private static Integer getIntValue(Node node,Integer defaultVal) { return node==null?defaultVal:Integer.valueOf(node.getNodeValue()); } diff --git a/source/src/main/java/io/mycat/mycat2/MySQLSession.java b/source/src/main/java/io/mycat/mycat2/MySQLSession.java index 8702984..2da4b57 100644 --- a/source/src/main/java/io/mycat/mycat2/MySQLSession.java +++ b/source/src/main/java/io/mycat/mycat2/MySQLSession.java @@ -4,13 +4,15 @@ import java.nio.ByteBuffer; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Map; import io.mycat.mycat2.beans.*; +import io.mycat.mysql.AutoCommit; import io.mycat.mysql.Capabilities; import io.mycat.mysql.Isolation; import io.mycat.mysql.packet.HandshakePacket; import io.mycat.mysql.packet.MySQLPacket; -import io.mycat.proxy.BufferOptState; import io.mycat.proxy.BufferPool; import io.mycat.proxy.ProxyBuffer; import io.mycat.proxy.ProxyRuntime; @@ -49,6 +51,17 @@ public class MySQLSession extends UserProxySession { * 事务隔离级别 */ public Isolation isolation = Isolation.REPEATED_READ; + + //当前接收到的包类型 + public enum CurrPacketType{ + Full,LongHalfPacket,ShortHalfPacket + } + + + /** + * 事务提交方式 + */ + public AutoCommit autoCommit = AutoCommit.OFF; /** * 认证中的seed报文数据 @@ -82,7 +95,7 @@ protected int getServerCapabilities() { /** * 回应客户端(front或Sever)OK 报文。 - * + * * @param pkg * ,必须要是OK报文或者Err报文 * @throws IOException @@ -92,18 +105,20 @@ public void responseOKOrError(MySQLPacket pkg, boolean front) throws IOException frontBuffer.changeOwner(true); pkg.write(this.frontBuffer); frontBuffer.flip(); + frontBuffer.readIndex=frontBuffer.writeIndex; this.writeToChannel(frontBuffer, this.frontChannel); } else { frontBuffer.changeOwner(false); pkg.write(this.frontBuffer); frontBuffer.flip(); + frontBuffer.readIndex=frontBuffer.writeIndex; this.writeToChannel(frontBuffer, this.backendChannel); } } /** * 给客户端(front)发送认证报文 - * + * * @throws IOException */ public void sendAuthPackge() throws IOException { @@ -129,8 +144,9 @@ public void sendAuthPackge() throws IOException { hs.serverStatus = 2; hs.restOfScrambleBuff = rand2; hs.write(this.frontBuffer); - // 进行读取状态的切换,即将写状态切换 为读取状态 + //设置frontBuffer 为读取状态 frontBuffer.flip(); + frontBuffer.readIndex = frontBuffer.writeIndex; this.writeToChannel(frontBuffer, this.frontChannel); } @@ -141,37 +157,41 @@ public MySQLSession(BufferPool bufPool, Selector nioSelector, SocketChannel fron /** * 向前端发送数据报文,需要先确定为Write状态并确保写入位置的正确(frontBuffer.writeState) - * + * * @param rawPkg * @throws IOException */ public void answerFront(byte[] rawPkg) throws IOException { frontBuffer.writeBytes(rawPkg); frontBuffer.flip(); + frontBuffer.readIndex = frontBuffer.writeIndex; writeToChannel(frontBuffer, frontChannel); } /** * 解析MySQL报文,解析的结果存储在curMSQLPackgInf中,如果解析到完整的报文,就返回TRUE * 如果解析的过程中同时要移动ProxyBuffer的readState位置,即标记为读过,后继调用开始解析下一个报文,则需要参数markReaded=true - * + * * @param proxyBuf * @return * @throws IOException */ - public boolean resolveMySQLPackage(ProxyBuffer proxyBuf, MySQLPackageInf curPackInf, boolean markReaded) + public CurrPacketType resolveMySQLPackage(ProxyBuffer proxyBuf, MySQLPackageInf curPackInf, boolean markReaded) throws IOException { - boolean readWholePkg = false; + ByteBuffer buffer = proxyBuf.getBuffer(); - BufferOptState readState = proxyBuf.readState; // 读取的偏移位置 - int offset = readState.optPostion; + int offset = proxyBuf.readIndex; // 读取的总长度 - int limit = readState.optLimit; + int limit = proxyBuf.writeIndex; // 读取当前的总长度 int totalLen = limit - offset; - if (totalLen == 0) { - return false; + if (totalLen == 0) { //透传情况下. 如果最后一个报文正好在buffer 最后位置,已经透传出去了.这里可能不会为零 + return CurrPacketType.ShortHalfPacket; + } + + if(curPackInf.remainsBytes==0&&curPackInf.crossBuffer){ + curPackInf.crossBuffer = false; } // 如果当前跨多个报文 @@ -179,21 +199,25 @@ public boolean resolveMySQLPackage(ProxyBuffer proxyBuf, MySQLPackageInf curPack if (curPackInf.remainsBytes <= totalLen) { // 剩余报文结束 curPackInf.endPos = offset + curPackInf.remainsBytes; + offset += curPackInf.remainsBytes; //继续处理下一个报文 + proxyBuf.readIndex = offset; curPackInf.remainsBytes = 0; - readWholePkg = true; } else {// 剩余报文还没读完,等待下一次读取 + curPackInf.startPos = 0; curPackInf.remainsBytes -= totalLen; curPackInf.endPos = limit; - readWholePkg = false; + proxyBuf.readIndex = curPackInf.endPos; + return CurrPacketType.LongHalfPacket; } } - // 验证当前指针位置是否 - else if (!ParseUtil.validateHeader(offset, limit)) { + //验证当前指针位置是否 + if (!ParseUtil.validateHeader(offset, limit)) { + //收到短半包 logger.debug("not read a whole packet ,session {},offset {} ,limit {}", getSessionId(), offset, limit); - readWholePkg = false; + return CurrPacketType.ShortHalfPacket; } - - // 解包获取包的数据长度 + + //解包获取包的数据长度 int pkgLength = ParseUtil.getPacketLength(buffer, offset); // 解析报文类型 final byte packetType = buffer.get(offset + ParseUtil.msyql_packetHeaderSize); @@ -203,19 +227,18 @@ else if (!ParseUtil.validateHeader(offset, limit)) { curPackInf.pkgLength = pkgLength; // 设置偏移位置 curPackInf.startPos = offset; - // 设置跨buffer为false + curPackInf.crossBuffer = false; + curPackInf.remainsBytes = 0; // 如果当前需要跨buffer处理 if ((offset + pkgLength) > limit) { logger.debug( - "Not a whole packet: required length = {} bytes, cur total length = {} bytes, " + "Not a whole packet: required length = {} bytes, cur total length = {} bytes, limit ={}, " + "ready to handle the next read event", - getSessionId(), buffer.hashCode(), pkgLength, limit); - curPackInf.crossBuffer = true; - curPackInf.remainsBytes = offset + pkgLength - limit; + pkgLength, (limit-offset),limit); curPackInf.endPos = limit; - readWholePkg = false; + return CurrPacketType.LongHalfPacket; } else { // 读到完整报文 curPackInf.endPos = curPackInf.pkgLength + curPackInf.startPos; @@ -228,13 +251,11 @@ else if (!ParseUtil.validateHeader(offset, limit)) { " session {} packet: startPos={}, offset = {}, length = {}, type = {}, cur total length = {},pkg HEX\r\n {}", getSessionId(), curPackInf.startPos, offset, pkgLength, packetType, limit, hexs); } - readWholePkg = true; - } - if (markReaded) { - readState.optPostion = curPackInf.endPos; + if (markReaded) { + proxyBuf.readIndex = curPackInf.endPos; + } + return CurrPacketType.Full; } - return readWholePkg; - } public void close(boolean normal, String hint) { diff --git a/source/src/main/java/io/mycat/mycat2/MycatSessionManager.java b/source/src/main/java/io/mycat/mycat2/MycatSessionManager.java index fa38f4c..3808a71 100644 --- a/source/src/main/java/io/mycat/mycat2/MycatSessionManager.java +++ b/source/src/main/java/io/mycat/mycat2/MycatSessionManager.java @@ -38,7 +38,8 @@ public MySQLSession createSession(Object keyAttachment, BufferPool bufPool, Sele session.setCurNIOHandler(MySQLClientAuthHandler.INSTANCE); // 默认为透传命令模式 session.curSQLCommand = DirectPassthrouhCmd.INSTANCE; - + // 设置 前端拥有 + session.frontBuffer.changeOwner(true); // 向MySQL Client发送认证报文 session.sendAuthPackge(); session.setSessionManager(this); diff --git a/source/src/main/java/io/mycat/mycat2/beans/MySQLBean.java b/source/src/main/java/io/mycat/mycat2/beans/MySQLBean.java index 711ed5c..68425e9 100644 --- a/source/src/main/java/io/mycat/mycat2/beans/MySQLBean.java +++ b/source/src/main/java/io/mycat/mycat2/beans/MySQLBean.java @@ -32,7 +32,6 @@ public class MySQLBean { private int port; private String user; private String password; - private String defaultSchema = "mysql"; private int maxCon = 1000; private int minCon = 1; @@ -89,14 +88,6 @@ public void setPassword(String password) { this.password = password; } - public String getDefaultSchema() { - return defaultSchema; - } - - public void setDefaultSchema(String defaultSchema) { - this.defaultSchema = defaultSchema; - } - public int getMaxCon() { return maxCon; } @@ -116,7 +107,7 @@ public void setMinCon(int minCon) { @Override public String toString() { return "MySQLBean [hostName=" + hostName + ", ip=" + ip + ", port=" + port + ", user=" + user + ", password=" - + password + ", defaultSchema=" + defaultSchema + ", maxCon=" + maxCon + ", minCon=" + minCon + "]"; + + password + ", maxCon=" + maxCon + ", minCon=" + minCon + "]"; } } diff --git a/source/src/main/java/io/mycat/mycat2/beans/MySQLPackageInf.java b/source/src/main/java/io/mycat/mycat2/beans/MySQLPackageInf.java index d403abb..8695686 100644 --- a/source/src/main/java/io/mycat/mycat2/beans/MySQLPackageInf.java +++ b/source/src/main/java/io/mycat/mycat2/beans/MySQLPackageInf.java @@ -5,7 +5,7 @@ * */ public class MySQLPackageInf { -public int pkgType; +public byte pkgType; public boolean crossBuffer; public int startPos; public int endPos; @@ -14,4 +14,9 @@ public class MySQLPackageInf { * 还有多少字节才结束,仅对跨多个Buffer的MySQL报文有意义(crossBuffer=true) */ public int remainsBytes; +@Override +public String toString() { + return "MySQLPackageInf [pkgType=" + pkgType + ", crossBuffer=" + crossBuffer + ", startPos=" + startPos + + ", endPos=" + endPos + ", pkgLength=" + pkgLength + ", remainsBytes=" + remainsBytes + "]"; +} } diff --git a/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java index e793585..31a296e 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java @@ -2,10 +2,17 @@ import java.io.IOException; import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.mycat.mycat2.MySQLSession; import io.mycat.mycat2.SQLCommand; import io.mycat.mycat2.beans.MySQLPackageInf; +import io.mycat.mysql.packet.MySQLPacket; import io.mycat.proxy.ProxyBuffer; /** @@ -15,31 +22,138 @@ * */ public class DirectPassthrouhCmd implements SQLCommand { - - public static final DirectPassthrouhCmd INSTANCE=new DirectPassthrouhCmd(); + + private static final Logger logger = LoggerFactory.getLogger(DirectPassthrouhCmd.class); + + public static final DirectPassthrouhCmd INSTANCE = new DirectPassthrouhCmd(); + + //********** 临时处理,等待与KK 代码合并 + private static final Map finishPackage = new HashMap<>(); + + private Map curfinishPackage = new HashMap<>(); + + static{ + finishPackage.put(MySQLPacket.OK_PACKET, 1); + finishPackage.put(MySQLPacket.ERROR_PACKET, 1); + finishPackage.put(MySQLPacket.EOF_PACKET, 2); + } + //********** 临时处理,等待与KK 代码合并 + @Override public boolean procssSQL(MySQLSession session, boolean backresReceived) throws IOException { - + if(backresReceived){ + return backendProcessor(session,session.curFrontMSQLPackgInf,session.frontChannel,session.frontBuffer); + }else{ + return frontProcessor(session,backresReceived); + } + } + + /** + * 前端报文处理 + * @param session + * @return + * @throws IOException + */ + private boolean frontProcessor(MySQLSession session, boolean backresReceived) throws IOException{ + curfinishPackage.putAll(finishPackage); ProxyBuffer curBuffer = session.frontBuffer; SocketChannel curChannel = session.backendChannel; - if (backresReceived) {// 收到后端发来的报文 - - curBuffer = session.frontBuffer; - curChannel = session.frontChannel; + // 切换 buffer 读写状态 + curBuffer.flip(); + // 改变 owner + curBuffer.changeOwner(false); + // 没有读取,直接透传时,需要指定 透传的数据 截止位置 + curBuffer.readIndex = curBuffer.writeIndex; + //当前是前端报文处理器,如果是后端报文处理器调用,不切换Owner + session.writeToChannel(curBuffer, curChannel); + return false; + } + + /** + * 后端报文处理 + * @param session + * @return + * @throws IOException + */ + private boolean backendProcessor(MySQLSession session,MySQLPackageInf curMSQLPackgInf, + SocketChannel curChannel,ProxyBuffer curBuffer)throws IOException{ + + if(!session.readFromChannel(session.frontBuffer, session.backendChannel)){ + return false; } - // 直接透传报文 - curBuffer.changeOwner(!curBuffer.frontUsing()); + boolean isallfinish = false; + boolean isContinue = true; + while(isContinue){ + switch(session.resolveMySQLPackage(curBuffer, curMSQLPackgInf,true)){ + case Full: + Integer count = curfinishPackage.get(curMSQLPackgInf.pkgType); + if(count!=null){ + if(--count==0){ + isallfinish = true; + curfinishPackage.clear(); + } + curfinishPackage.put(curMSQLPackgInf.pkgType, count); + } + if(curBuffer.readIndex == curBuffer.writeIndex){ + isContinue = false; + }else{ + isContinue = true; + } + break; + case LongHalfPacket: + if(curMSQLPackgInf.crossBuffer){ + //发生过透传的半包,往往包的长度超过了buffer 的长度. + logger.debug(" readed crossBuffer LongHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf); + }else if(!isfinishPackage(curMSQLPackgInf)){ + //不需要整包解析的长半包透传. result set .这种半包直接透传 + curMSQLPackgInf.crossBuffer=true; + curBuffer.readIndex = curMSQLPackgInf.endPos; + curMSQLPackgInf.remainsBytes = curMSQLPackgInf.pkgLength-(curMSQLPackgInf.endPos - curMSQLPackgInf.startPos); + logger.debug(" readed LongHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf); + logger.debug(" curBuffer {}", curBuffer); + }else{ + // 读取到了EOF/OK/ERROR 类型长半包 是需要保证是整包的. + logger.debug(" readed finished LongHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf); + // TODO 保证整包的机制 + } + isContinue = false; + break; + case ShortHalfPacket: + logger.debug(" readed ShortHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf); + isContinue = false; + break; + } + }; + + //切换buffer 读写状态 curBuffer.flip(); + if(isallfinish){ + curBuffer.changeOwner(true); + } + // 直接透传报文 session.writeToChannel(curBuffer, curChannel); - session.modifySelectKey(); + /** + * 当前命令处理是否全部结束,全部结束时需要清理资源 + */ return false; } + + private boolean isfinishPackage(MySQLPackageInf curMSQLPackgInf)throws IOException{ + switch(curMSQLPackgInf.pkgType){ + case MySQLPacket.OK_PACKET: + case MySQLPacket.ERROR_PACKET: + case MySQLPacket.EOF_PACKET: + return true; + default: + return false; + } + } @Override public void clearResouces(boolean sessionCLosed) { - // nothint to do - + // TODO Auto-generated method stub + } } diff --git a/source/src/main/java/io/mycat/mycat2/cmds/QueryCmdProcessImpl.java b/source/src/main/java/io/mycat/mycat2/cmds/QueryCmdProcessImpl.java new file mode 100644 index 0000000..e9263c7 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/QueryCmdProcessImpl.java @@ -0,0 +1,76 @@ +package io.mycat.mycat2.cmds; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.cmds.query.DefaultQuerySqlProcessImpl; +import io.mycat.mycat2.cmds.query.QuerySQLProcessInf; +import io.mycat.mycat2.sqlparser.NewSQLContext; +import io.mycat.mycat2.sqlparser.NewSQLParser; +import io.mycat.mysql.packet.MySQLPacket; + +/** + * 进行查询命令3处理 + * + * @since 2017年8月15日 下午6:12:18 + * @version 0.0.1 + * @author liujun + */ +public class QueryCmdProcessImpl implements SQLComandProcessInf { + + private static Logger logger = LoggerFactory.getLogger(QueryCmdProcessImpl.class); + + private NewSQLContext sqlContext = new NewSQLContext(); + + private NewSQLParser sqlParser = new NewSQLParser(); + + /** + * 查询命令工厂方法实例对象 + */ + public static final QueryCmdProcessImpl INSTANCE = new QueryCmdProcessImpl(); + + /** + * 分SQL进行实现 + */ + private static final Map QUERY_MAP = new HashMap<>(); + + static { + QUERY_MAP.put(NewSQLContext.SHOW_SQL, DefaultQuerySqlProcessImpl.INSTANCE); + QUERY_MAP.put(NewSQLContext.SET_SQL, DefaultQuerySqlProcessImpl.INSTANCE); + QUERY_MAP.put(NewSQLContext.SELECT_SQL, null); + QUERY_MAP.put(NewSQLContext.INSERT_SQL, null); + QUERY_MAP.put(NewSQLContext.UPDATE_SQL, null); + QUERY_MAP.put(NewSQLContext.DELETE_SQL, null); + } + + @Override + public void commandProc(MySQLSession session) throws IOException { + + byte[] sql = session.frontBuffer.getBytes( + session.curFrontMSQLPackgInf.startPos + MySQLPacket.packetHeaderSize + 1, + session.curFrontMSQLPackgInf.pkgLength - MySQLPacket.packetHeaderSize - 1); + sqlParser.parse(sql, sqlContext); + if (sqlContext.hasAnnotation()) { + // 此处添加注解处理 + } + + QuerySQLProcessInf querySqlProc = null; + + for (int i = 0; i < sqlContext.getSQLCount(); i++) { + querySqlProc = QUERY_MAP.get(sqlContext.getSQLType(i)); + + if (null != querySqlProc) { + querySqlProc.querySqlProc(session, sqlContext); + } else { + DefaultQuerySqlProcessImpl.INSTANCE.querySqlProc(session, sqlContext); + } + } + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/SQLComandProcessInf.java b/source/src/main/java/io/mycat/mycat2/cmds/SQLComandProcessInf.java new file mode 100644 index 0000000..f387320 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/SQLComandProcessInf.java @@ -0,0 +1,24 @@ +package io.mycat.mycat2.cmds; + +import java.io.IOException; + +import io.mycat.mycat2.MySQLSession; + +/** + * 进行命令处理的接口 + * + * @since 2017年8月15日 下午6:10:39 + * @version 0.0.1 + * @author liujun + */ +public interface SQLComandProcessInf { + + /** + * 进行命令处理的接口 + * + * @param session + * 会话信息 + */ + public void commandProc(MySQLSession session) throws IOException; + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/query/DefaultQuerySqlProcessImpl.java b/source/src/main/java/io/mycat/mycat2/cmds/query/DefaultQuerySqlProcessImpl.java new file mode 100644 index 0000000..0a56ede --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/query/DefaultQuerySqlProcessImpl.java @@ -0,0 +1,29 @@ +package io.mycat.mycat2.cmds.query; + +import java.io.IOException; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.sqlparser.NewSQLContext; + +/** + * 默认的查询SQL处理 + * + * @since 2017年8月15日 下午6:29:45 + * @version 0.0.1 + * @author liujun + */ +public class DefaultQuerySqlProcessImpl implements QuerySQLProcessInf { + + /** + * 工厂方法实例 + */ + public static final DefaultQuerySqlProcessImpl INSTANCE = new DefaultQuerySqlProcessImpl(); + + @Override + public void querySqlProc(MySQLSession session, NewSQLContext sqlContext) throws IOException { + if (session.curSQLCommand.procssSQL(session, false)) { + session.curSQLCommand.clearResouces(false); + } + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/cmds/query/QuerySQLProcessInf.java b/source/src/main/java/io/mycat/mycat2/cmds/query/QuerySQLProcessInf.java new file mode 100644 index 0000000..c5f7ec4 --- /dev/null +++ b/source/src/main/java/io/mycat/mycat2/cmds/query/QuerySQLProcessInf.java @@ -0,0 +1,29 @@ +package io.mycat.mycat2.cmds.query; + +import java.io.IOException; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.sqlparser.NewSQLContext; + +/** + * 进行查询SQL的处理 + * + * @since 2017年8月15日 下午6:23:25 + * @version 0.0.1 + * @author liujun + */ +public interface QuerySQLProcessInf { + + /** + * 进行查询的SQL处理 + * + * @param session + * 会话 + * @param sqlContext + * sql处理的上下文对象 + * @throws IOException + * 异常 + */ + public void querySqlProc(MySQLSession session, NewSQLContext sqlContext) throws IOException; + +} diff --git a/source/src/main/java/io/mycat/mycat2/net/DefaultMySQLStudySessionHandler.java b/source/src/main/java/io/mycat/mycat2/net/DefaultMySQLStudySessionHandler.java index ed8a2cb..de809a2 100644 --- a/source/src/main/java/io/mycat/mycat2/net/DefaultMySQLStudySessionHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/DefaultMySQLStudySessionHandler.java @@ -4,6 +4,7 @@ import java.nio.channels.SocketChannel; import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.MySQLSession.CurrPacketType; import io.mycat.mycat2.beans.MySQLPackageInf; import io.mycat.proxy.DefaultDirectProxyHandler; import io.mycat.proxy.ProxyBuffer; @@ -23,11 +24,12 @@ public void onFrontRead(MySQLSession session) throws IOException { ProxyBuffer peerBuf = session.frontBuffer; SocketChannel peerChannel = session.backendChannel; MySQLPackageInf curPkgInf = session.curFrontMSQLPackgInf; - if (readed == false || session.resolveMySQLPackage(peerBuf, curPkgInf,true) == false) { + if (readed == false || + CurrPacketType.Full!=session.resolveMySQLPackage(peerBuf, curPkgInf, true)) { return; } - if (peerBuf.readState.hasRemain()) { + if (peerBuf.hasRemain()) { logger.warn("front received multi packages !!!!! "); processAllRemainPkg(session, peerBuf, curPkgInf); @@ -37,17 +39,17 @@ public void onFrontRead(MySQLSession session) throws IOException { peerBuf.flip(); session.writeToChannel(peerBuf, peerChannel); return; - } private void processAllRemainPkg(MySQLSession session, ProxyBuffer theBuf, MySQLPackageInf curPkgInf) throws IOException { int pkgIndex = 2; - while (theBuf.readState.hasRemain() && session.resolveMySQLPackage(theBuf, curPkgInf,true) != false) { + while (theBuf.hasRemain() && + CurrPacketType.Full==session.resolveMySQLPackage(theBuf, curPkgInf, true)) { logger.info(" parsed No." + pkgIndex + " package ,type " + curPkgInf.pkgType + " len " + curPkgInf.pkgLength); pkgIndex++; } - if (theBuf.readState.hasRemain()) { + if (theBuf.hasRemain()) { logger.warn("has half package remains "); } @@ -59,11 +61,12 @@ public void onBackendRead(MySQLSession session) throws IOException { ProxyBuffer peerBuf = session.frontBuffer; SocketChannel peerChannel = session.frontChannel; MySQLPackageInf curPkgInf = session.curBackendMSQLPackgInf; - if (readed == false || session.resolveMySQLPackage(peerBuf, curPkgInf,true) == false) { + if (readed == false || + CurrPacketType.Full != session.resolveMySQLPackage(peerBuf, curPkgInf, true)) { return; } - if (peerBuf.readState.hasRemain()) { + if (peerBuf.hasRemain()) { logger.warn("backend received multi packages !!!!! "); processAllRemainPkg(session, peerBuf, curPkgInf); diff --git a/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java b/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java index 6a949e5..5b9c210 100644 --- a/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java @@ -2,25 +2,27 @@ import java.io.IOException; import java.net.InetSocketAddress; -import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; -import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; -import io.mycat.mycat2.sqlparser.NewSQLContext; -import io.mycat.mycat2.sqlparser.NewSQLParser; -import io.mycat.mysql.packet.MySQLPacket; -import io.mycat.mycat2.beans.MySQLDataSource; -import io.mycat.proxy.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.MySQLSession.CurrPacketType; +import io.mycat.mycat2.beans.MySQLDataSource; +import io.mycat.mycat2.cmds.QueryCmdProcessImpl; +import io.mycat.mycat2.cmds.SQLComandProcessInf; import io.mycat.mycat2.tasks.BackendConCreateTask; import io.mycat.mycat2.tasks.BackendSynchronzationTask; import io.mycat.mysql.packet.ErrorPacket; +import io.mycat.mysql.packet.MySQLPacket; +import io.mycat.proxy.BackendIOHandler; +import io.mycat.proxy.FrontIOHandler; +import io.mycat.proxy.ProxyBuffer; +import io.mycat.proxy.UserProxySession; /** * 负责MycatSession的NIO事件,驱动SQLCommand命令执行,完成SQL的处理过程 @@ -31,40 +33,44 @@ public class DefaultMycatSessionHandler implements FrontIOHandler, BackendIOHandler { public static final DefaultMycatSessionHandler INSTANCE = new DefaultMycatSessionHandler(); private static Logger logger = LoggerFactory.getLogger(DefaultMycatSessionHandler.class); - private NewSQLContext sqlContext = new NewSQLContext(); - private NewSQLParser sqlParser = new NewSQLParser(); /** - * 进行特殊包处理的包 + * 进行特殊包处理的容器 */ private static final Map PKGMAP = new HashMap<>(); + /** + * 进行SQL命令的处理的容器 + */ + private static final Map SQLCOMMANDMAP = new HashMap<>(); + static { // 进行load data命令处理类 PKGMAP.put((int) (byte) 0xfb, LoadDataHandler.INSTANCE); + + // 进行SQL命令容器对象信息添加 + SQLCOMMANDMAP.put(MySQLPacket.COM_QUERY, QueryCmdProcessImpl.INSTANCE); } @Override public void onFrontRead(final MySQLSession session) throws IOException { boolean readed = session.readFromChannel(session.frontBuffer, session.frontChannel); ProxyBuffer buffer = session.frontBuffer; - if (readed == false) { + if (readed == false|| + // 没有读到完整报文 + CurrPacketType.Full != session.resolveMySQLPackage(buffer, session.curFrontMSQLPackgInf, false)) { return; } - if (session.resolveMySQLPackage(buffer, session.curFrontMSQLPackgInf, false) == false) { - // 没有读到完整报文 - return; - } - if (session.curFrontMSQLPackgInf.endPos < buffer.getReadOptState().optLimit) { + if (session.curFrontMSQLPackgInf.endPos < buffer.writeIndex) { logger.warn("front contains multi package "); } if (session.backendChannel == null) { // todo ,从连接池中获取连接,获取不到后创建新连接, - final MySQLDataSource datas = session.getDatasource(); + final MySQLDataSource ds = session.getDatasource(); logger.info("hang cur sql for backend connection ready "); - String serverIP = datas.getConfig().getIp(); - int serverPort = datas.getConfig().getPort(); + String serverIP = ds.getConfig().getIp(); + int serverPort = ds.getConfig().getPort(); InetSocketAddress serverAddress = new InetSocketAddress(serverIP, serverPort); session.backendChannel = SocketChannel.open(); session.backendChannel.configureBlocking(false); @@ -74,7 +80,7 @@ public void onFrontRead(final MySQLSession session) throws IOException { session.backendKey = selectKey; logger.info("Connecting to server " + serverIP + ":" + serverPort); - BackendConCreateTask authProcessor = new BackendConCreateTask(session, null); + BackendConCreateTask authProcessor = new BackendConCreateTask(session, ds); authProcessor.setCallback((optSession, Sender, exeSucces, retVal) -> { if (exeSucces) { // 认证成功后开始同步会话状态至后端 @@ -90,51 +96,19 @@ public void onFrontRead(final MySQLSession session) throws IOException { } else { // 如果是 SQL 则调用 sql parser 进行处理 + SQLComandProcessInf sqlCmd = SQLCOMMANDMAP.get(session.curFrontMSQLPackgInf.pkgType); - if (session.curFrontMSQLPackgInf.pkgType == MySQLPacket.COM_QUERY) { - byte[] sql = session.frontBuffer.getBytes(session.curFrontMSQLPackgInf.startPos + MySQLPacket.packetHeaderSize + 1, session.curFrontMSQLPackgInf.endPos - MySQLPacket.packetHeaderSize - 1); - sqlParser.parse(sql, sqlContext); - if (sqlContext.hasAnnotation()) { - //此处添加注解处理 - } - for (int i = 0; i < sqlContext.getSQLCount(); i++) { - switch (sqlContext.getSQLType(i)) { - case NewSQLContext.SHOW_SQL: - logger.info("SHOW_SQL : "+(new String(sql, StandardCharsets.UTF_8))); - sendSqlCommand(session); - break; - case NewSQLContext.SET_SQL: - logger.info("SET_SQL : "+(new String(sql, StandardCharsets.UTF_8))); - sendSqlCommand(session); - break; - case NewSQLContext.SELECT_SQL: - case NewSQLContext.INSERT_SQL: - case NewSQLContext.UPDATE_SQL: - case NewSQLContext.DELETE_SQL: - logger.info("Parse SQL : "+(new String(sql, StandardCharsets.UTF_8))); - String tbls = ""; - for (int j=0; j { @@ -152,31 +126,12 @@ private void syncSessionStateToBackend(MySQLSession mySQLSession) throws IOExcep } public void onBackendRead(MySQLSession session) throws IOException { - boolean readed = session.readFromChannel(session.frontBuffer, session.backendChannel); - if (readed == false) { - return; - } - - ProxyBuffer backendBuffer = session.frontBuffer; - - if (session.resolveMySQLPackage(backendBuffer, session.curFrontMSQLPackgInf, false) == false) { - // 没有读到完整报文 - return; - } // 交给SQLComand去处理 if (session.curSQLCommand.procssSQL(session, true)) { session.curSQLCommand.clearResouces(false); } - // 检查当前的包是否需要进行特殊的处理 - DefaultMycatSessionHandler handler = PKGMAP.get(session.curFrontMSQLPackgInf.pkgType); - - if (null != handler) { - // 设置lodata的透传执行 - session.setCurNIOHandler(handler); - } - } @Override @@ -191,7 +146,7 @@ public void onBackendConnect(MySQLSession userSession, boolean success, String m * @param normal */ public void onFrontSocketClosed(MySQLSession userSession, boolean normal) { - userSession.lazyCloseSession(normal,"front closed"); + userSession.lazyCloseSession(normal, "front closed"); } @@ -202,7 +157,7 @@ public void onFrontSocketClosed(MySQLSession userSession, boolean normal) { * @param normal */ public void onBackendSocketClosed(MySQLSession userSession, boolean normal) { - userSession.lazyCloseSession(normal,"backend closed "); + userSession.lazyCloseSession(normal, "backend closed "); } /** @@ -219,13 +174,12 @@ protected void onSocketException(UserProxySession userSession, Exception excepti } else { logger.warn("DefaultSQLHandler handle IO error " + userSession.sessionInfo(), exception); } - userSession.close(false,"exception:" + exception.getMessage()); + userSession.close(false, "exception:" + exception.getMessage()); } @Override public void onFrontWrite(MySQLSession session) throws IOException { session.writeToChannel(session.frontBuffer, session.frontChannel); - } @Override diff --git a/source/src/main/java/io/mycat/mycat2/net/LoadDataHandler.java b/source/src/main/java/io/mycat/mycat2/net/LoadDataHandler.java index d0677e1..b600947 100644 --- a/source/src/main/java/io/mycat/mycat2/net/LoadDataHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/LoadDataHandler.java @@ -42,7 +42,7 @@ public void onFrontRead(final MySQLSession session) throws IOException { if (readed == false) { return; } - if (session.curFrontMSQLPackgInf.endPos < backendBuffer.getReadOptState().optLimit) { + if (session.curFrontMSQLPackgInf.endPos < backendBuffer.writeIndex) { logger.warn("front contains multi package "); } @@ -91,7 +91,7 @@ public boolean transLoadData(MySQLSession session, boolean backresReceived) thro return true; } // 当前的buffer被写完之后,需要做清空处理 - if (curBuffer.writeState.optPostion == curBuffer.getBuffer().position()) { + if (curBuffer.writeIndex == curBuffer.getBuffer().position()) { curBuffer.reset(); } @@ -110,14 +110,14 @@ private void readOverByte(ProxyBuffer curBuffer) { // 如果数据的长度超过了,结束符的长度,可直接提取结束符 if (buffer.position() >= FLAGLENGTH) { - int opts = curBuffer.readState.optLimit; + int opts = curBuffer.readIndex; buffer.position(opts - FLAGLENGTH); buffer.get(overFlag, 0, FLAGLENGTH); buffer.position(opts); } // 如果小于结束符,说明需要进行两个byte数组的合并 else { - int opts = curBuffer.readState.optLimit; + int opts = curBuffer.readIndex; // 计算放入的位置 int moveSize = FLAGLENGTH - opts; int index = 0; diff --git a/source/src/main/java/io/mycat/mycat2/net/MySQLClientAuthHandler.java b/source/src/main/java/io/mycat/mycat2/net/MySQLClientAuthHandler.java index 5ed77de..78224b8 100644 --- a/source/src/main/java/io/mycat/mycat2/net/MySQLClientAuthHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/MySQLClientAuthHandler.java @@ -3,6 +3,7 @@ import java.io.IOException; import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.MySQLSession.CurrPacketType; import io.mycat.mysql.packet.AuthPacket; import io.mycat.proxy.DefaultDirectProxyHandler; import io.mycat.proxy.ProxyBuffer; @@ -22,15 +23,16 @@ public class MySQLClientAuthHandler extends DefaultDirectProxyHandler * 同步状态至后端数据库,包括:字符集,事务,隔离级别等 */ public class BackendSynchronzationTask extends AbstractBackendIOTask { - private static Logger logger = LoggerFactory.getLogger(BackendSynchronzationTask.class); - private static QueryPacket[] CMDS = new QueryPacket[3]; - private int processCmd = 0; - - static { - QueryPacket isolationSynCmd = new QueryPacket(); - isolationSynCmd.packetId = 0; - - QueryPacket charsetSynCmd = new QueryPacket(); - charsetSynCmd.packetId = 0; - - QueryPacket transactionSynCmd = new QueryPacket(); - transactionSynCmd.packetId = 0; - - CMDS[0] = isolationSynCmd; - CMDS[1] = charsetSynCmd; - CMDS[2] = transactionSynCmd; - } - - public BackendSynchronzationTask(MySQLSession session) throws IOException { - super(session); - this.processCmd = 0; - syncState(session); - } - - private void syncState(MySQLSession session) throws IOException { - logger.info("synchronzation state to bakcend.session=" + session.toString()); - ProxyBuffer frontBuffer = session.frontBuffer; - frontBuffer.reset(); - // TODO 字符集映射和前端事务设置还未完成,这里只用隔离级别模拟实现(其实都是SET xxx效果一样),回头补充 - switch (processCmd) { - case 1: - case 2: - case 0: - CMDS[processCmd].sql = session.isolation.getCmd(); - CMDS[processCmd].write(frontBuffer); + private static Logger logger = LoggerFactory.getLogger(BackendSynchronzationTask.class); - frontBuffer.flip(); - session.writeToChannel(frontBuffer, session.backendChannel); - processCmd++; - break; - default: - this.finished(true); - break; - } + public BackendSynchronzationTask(MySQLSession session) throws IOException { + super(session); + syncState(session); + } - } + private void syncState(MySQLSession session) throws IOException { + logger.info("synchronzation state to bakcend.session=" + session.toString()); + ProxyBuffer frontBuffer = session.frontBuffer; + frontBuffer.reset(); + // TODO 字符集映射未完成 + QueryPacket queryPacket = new QueryPacket(); + queryPacket.packetId = 0; + queryPacket.sql = session.isolation.getCmd() + session.autoCommit.getCmd() + session.isolation.getCmd(); + queryPacket.write(frontBuffer); + frontBuffer.flip(); + frontBuffer.readIndex = frontBuffer.writeIndex; + session.writeToChannel(frontBuffer, session.backendChannel); + } - @Override - public void onBackendRead(MySQLSession session) throws IOException { - session.frontBuffer.reset(); - if (!session.readFromChannel(session.frontBuffer, session.backendChannel) - || !session.resolveMySQLPackage(session.frontBuffer, session.curBackendMSQLPackgInf, false)) {// 没有读到数据或者报文不完整 - return; - } - if (session.curBackendMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) { - syncState(session); - } else { - // TODO 同步失败如何处理??是否应该关闭此连接?? - errPkg = new ErrorPacket(); - errPkg.read(session.frontBuffer); - logger.warn("backend state sync Error.Err No. " + errPkg.errno + "," + errPkg.message); - this.finished(false); - } - } + @Override + public void onBackendRead(MySQLSession session) throws IOException { + session.frontBuffer.reset(); + if (!session.readFromChannel(session.frontBuffer, session.backendChannel) + || CurrPacketType.Full != session.resolveMySQLPackage(session.frontBuffer, session.curBackendMSQLPackgInf, false)) {// 没有读到数据或者报文不完整 + return; + } + if (session.curBackendMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) { + session.frontBuffer.reset(); + this.finished(true); + } else { + errPkg = new ErrorPacket(); + errPkg.read(session.frontBuffer); + logger.warn("backend state sync Error.Err No. " + errPkg.errno + "," + errPkg.message); + this.finished(false); + } + } - @Override - public void onBackendSocketClosed(MySQLSession userSession, boolean normal) { - logger.warn(" socket closed not handlerd" + session.toString()); - } + @Override + public void onBackendSocketClosed(MySQLSession userSession, boolean normal) { + logger.warn(" socket closed not handlerd" + session.toString()); + } } diff --git a/source/src/main/java/io/mycat/mysql/AutoCommit.java b/source/src/main/java/io/mycat/mysql/AutoCommit.java new file mode 100644 index 0000000..c9d1b1f --- /dev/null +++ b/source/src/main/java/io/mycat/mysql/AutoCommit.java @@ -0,0 +1,19 @@ +package io.mycat.mysql; + +/** + * Created by ynfeng on 2017/8/19. + */ +public enum AutoCommit { + ON("SET autocommit = 1;"), + OFF("SET autocommit = 0;"); + + AutoCommit(String cmd) { + this.cmd = cmd; + } + + private String cmd; + + public String getCmd() { + return cmd; + } +} diff --git a/source/src/main/java/io/mycat/mysql/Isolation.java b/source/src/main/java/io/mycat/mysql/Isolation.java index b382f4b..277063f 100644 --- a/source/src/main/java/io/mycat/mysql/Isolation.java +++ b/source/src/main/java/io/mycat/mysql/Isolation.java @@ -44,9 +44,5 @@ public enum Isolation { public String getCmd() { return cmd; } - - public void setCmd(String cmd) { - this.cmd = cmd; - } } diff --git a/source/src/main/java/io/mycat/mysql/packet/ErrorPacket.java b/source/src/main/java/io/mycat/mysql/packet/ErrorPacket.java index f6aa180..f5e2aee 100644 --- a/source/src/main/java/io/mycat/mysql/packet/ErrorPacket.java +++ b/source/src/main/java/io/mycat/mysql/packet/ErrorPacket.java @@ -60,7 +60,7 @@ public void read(ProxyBuffer byteBuffer) { packetId =byteBuffer.readByte(); pkgType =byteBuffer.readByte(); errno = (int) byteBuffer.readFixInt(2); - if (byteBuffer.readState.hasRemain() && (byteBuffer.getByte(byteBuffer.readState.optPostion) == SQLSTATE_MARKER)) { + if ((byteBuffer.writeIndex - byteBuffer.readIndex) >0 && (byteBuffer.getByte(byteBuffer.readIndex) == SQLSTATE_MARKER)) { byteBuffer.skip(1); sqlState = byteBuffer.readBytes(5); } diff --git a/source/src/main/java/io/mycat/proxy/ProxyBuffer.java b/source/src/main/java/io/mycat/proxy/ProxyBuffer.java index 8b4ec92..9c80e59 100644 --- a/source/src/main/java/io/mycat/proxy/ProxyBuffer.java +++ b/source/src/main/java/io/mycat/proxy/ProxyBuffer.java @@ -1,44 +1,90 @@ package io.mycat.proxy; +import java.nio.ByteBuffer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * 可重用的Buffer,连续读或者写,当空间不够时Compact擦除之前用过的空间, * 处于写状态或者读状态之一,不能同时读写, 只有数据被操作完成(读完或者写完)后State才能被改变(flip方法或手工切换状态),同时可能要改变Owner,chanageOwn + * + * 需要外部 关心的状态为 + * writeIndex 写入buffer 开始位置 + * readIndex 读取开始位置 + * inReading 当前buffer 读写状态 + * frontUsing owner + * 不需要外部关心的状态为 + * readMark 向channel 中写入数据时的开始位置, 该状态由 writeToChannel 自动维护,不需要外部显式指定 + * preUsing 上一个owner 仅在 write==0 或只写了一部分数据的情况下,需要临时改变 owner .本次写入完成后,需要根据preUsing 自动切换回来 + * 使用流程 + * 一、透传、只前端读写、只后端读写场景 + * 1. 从channel 向 buffer 写入数据 始终从 writeIndex 开始写入 , inReading 状态一定为 false 写入状态 + * 2. 读取 buffer 中数据 读取的数据范围是 readIndex --- writeIndex 之间的数据. + * 3. 向 channel 写入数据前, flip 切换读写状态 + * 4. 数据全部透传完成(例如:整个结果集透传完成)后 changeOwner,否则 owner 不变. + * 5. 从 buffer 向 channel 写入数据时,写入 readMark--readIndex 之间的数据. + * 6. 写完成后 flip 切换读写状态。同时 如果 readIndex > buffer.capacity() * 2 / 3 进行一次压缩 + * 7. 从 channel 向buffer 写入数据时,如果 writeIndex > buffer.capacity() * 1 / 3 进行一次压缩 + * + * 二、没有读取数据,向buffer中写入数据后 直接 write 到 channel的场景 + * 1. 在写入到 channel 时 ,需要显式 指定 readIndex = writeIndex; + * 2. 其他步骤 同 (透传、只前端读写、只后端读写场景)场景 + * + * @author yanjunli + * */ -import java.nio.ByteBuffer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - public class ProxyBuffer { protected static Logger logger = LoggerFactory.getLogger(ProxyBuffer.class); private final ByteBuffer buffer; + // 对于Write to Buffer + // 的操作,writeIndex表示当前可读的数据截止位置,readMark为数据开始位置,用户可以标记,下次写入Buffer时,从writeIndex的位置继续写入 + + // 对于Read to + // Channel的操作,readMark表示读取数据截止的位置,writeIndex表示数据截止的位置,0-readMark之间的数据是要被写入CHannel的,写入Channel + // 后,Write 完成后检查optMark;当writeIndex > 1/3 capacity 时 执行 compact。将readIndex 与 writeIndex 之间的数据移动到buffer 开始位置 + + /* + * 写入数据开始位置, 从socket buffer 写入数据时,从该位置开始写入 + */ + public int writeIndex; + + /* + * 用于标记透传时 的结束位置 + * 向socket 中吸入数据时,用于标记写入到socket 中数据的结束位置 + */ + public int readIndex; /** - * 通道的读写状态标标识,false为写入,true 为读取 + * 当前buffer 的读写状态 . 默认为写入状态 + * false 从 channel 中 向 buffer 写入数据 即: write 状态 + * true 从buffer 向 channel 写出数据 即: read 状态 */ private boolean inReading = false; + //一般都是后端连接率先给客户端发起信息,所以后端默认使用 + private boolean frontUsing=false; - /** - * 通道读取的状态标识 - */ - public BufferOptState readState = new BufferOptState(); + /* + * ********************************************************************* + * 以下 两个状态 ,不推荐 外部使用. 会在进行网络读写时,网络读写程序内部使用. + * ********************************************************************* + */ - /** - * 通道数据写入状态的标识 + /* + * 用于标记透传时数据开始位置, 这个指针不推荐外部用户 显示调用,由网络读写状态自动管理 + * 为减少 compact 次数. 引入第三个指针. 在 writeIndex > 2/3 capacity 时 执行 compact */ - public BufferOptState writeState = new BufferOptState(); - //一般都是后端连接率先给客户端发起信息,所以后端默认使用 - private boolean frontUsing=false; + public int readMark; + + // 上一个owner, 外部用户 不需要关心 该值 + // 在发生 网络阻塞情况下,可能只写出去了 0个字节. + // 这时需要 切换 使用者,同时保存当前使用者.用于 数据传输完毕后,切换回来 + private Boolean preUsing=null; public ProxyBuffer(ByteBuffer buffer) { super(); this.buffer = buffer; - writeState.startPos = 0; - writeState.optPostion = 0; - writeState.optLimit = buffer.capacity(); } public boolean isInReading() { @@ -48,6 +94,14 @@ public boolean isInReading() { public boolean isInWriting() { return inReading == false; } + + /** + * 当前还是数据可读 + * @return + */ + public boolean hasRemain(){ + return (writeIndex - readIndex) > 0; + } /** * 需要谨慎使用,调用者需要清除当前Buffer所处的状态!! @@ -58,45 +112,27 @@ public ByteBuffer getBuffer() { return buffer; } - public void setInReading(boolean inReading) { - this.inReading = inReading; - } - - public void changeOwner(boolean front) + public ProxyBuffer changeOwner(boolean front) { this.frontUsing=front; + return this; } - /** - * 交换Read与Write状态 - */ - public void flip() { - - if (this.inReading) { - // 转为可写状态 - inReading = false; - writeState.startPos = 0; - writeState.optPostion = 0; - writeState.optLimit = buffer.capacity(); - writeState.optedTotalLength = 0; - // 转为可写状态时恢复读状态为初始(不可读) - readState.startPos = 0; - readState.optPostion = 0; - readState.optLimit = 0; - } else { - // 转为读状态 - inReading = true; - //读取状态的开始指针指定为写入状态的开始指针 - readState.startPos = writeState.startPos; - //opt指针指定为状态开始的指针 - readState.optPostion = writeState.startPos; - //读取的最大长度指定为现写入的长度 - readState.optLimit = writeState.optPostion; - //总字节数转换为0 - readState.optedTotalLength = 0; - } - logger.debug("flip, new state {} , write state: {} ,read state {}", this.inReading ? "read" : "write", - this.writeState, this.readState); - + + public boolean frontUsing() { + return this.frontUsing; + } + + public boolean backendUsing() + { + return !frontUsing; + } + + public void reset() + { + this.readMark=0; + this.readIndex=0; + this.writeIndex=0; + this.buffer.clear(); } /** @@ -105,37 +141,31 @@ public void flip() { * @param step */ public void skip(int step) { - this.readState.optPostion += step; - } - - public BufferOptState getReadOptState() { - return this.readState; - } - - public BufferOptState getWriteOptState() { - return this.writeState; + readIndex += step; } /** * 写状态时候,如果数据写满了,可以调用此方法移除之前的旧数据 */ - public void compact(boolean synReadStatePos) { - if (this.inReading) { - throw new RuntimeException("not in writing state ,can't Compact"); - } - this.buffer.position(writeState.startPos); - this.buffer.limit(writeState.optPostion); + public void compact() { + + this.buffer.position(readMark); + this.buffer.limit(writeIndex); this.buffer.compact(); - int offset = writeState.startPos; - writeState.startPos = 0; - writeState.optPostion = buffer.limit(); - writeState.optLimit = buffer.capacity(); - buffer.limit(buffer.capacity()); - if (synReadStatePos) { - readState.optPostion -= offset; - readState.optLimit -= offset; + readIndex -= readMark; + readMark = 0; + writeIndex = buffer.position(); + } + + /** + * 交换Read与Write状态 + */ + public void flip() { + if (this.inReading) { + this.inReading = false; + }else{ + this.inReading = true; } - } public ProxyBuffer writeBytes(byte[] bytes) { @@ -144,31 +174,30 @@ public ProxyBuffer writeBytes(byte[] bytes) { } public long readFixInt(int length) { - long val = getInt(readState.optPostion, length); - this.readState.optPostion += length; + long val = getInt(readIndex, length); + readIndex += length; return val; } public long readLenencInt() { - int index = readState.optPostion; - long len = getInt(readState.optPostion, 1) & 0xff; + int index = readIndex; + long len = getInt(index, 1) & 0xff; if (len < 251) { - this.readState.optPostion += 1; + readIndex += 1; return getInt(index, 1); } else if (len == 0xfc) { - this.readState.optPostion += 2; + readIndex += 2; return getInt(index + 1, 2); } else if (len == 0xfd) { - this.readState.optPostion += 3; + readIndex += 3; return getInt(index + 1, 3); } else { - this.readState.optPostion += 8; + readIndex += 8; return getInt(index + 1, 8); } } public long getInt(int index, int length) { - buffer.limit(index + length); buffer.position(index); long rv = 0; for (int i = 0; i < length; i++) { @@ -179,7 +208,6 @@ public long getInt(int index, int length) { } public byte[] getBytes(int index, int length) { - buffer.limit(length + index); buffer.position(index); byte[] bytes = new byte[length]; buffer.get(bytes); @@ -187,7 +215,6 @@ public byte[] getBytes(int index, int length) { } public byte getByte(int index) { - buffer.limit(index + 1); buffer.position(index); byte b = buffer.get(); return b; @@ -199,8 +226,8 @@ public String getFixString(int index, int length) { } public String readFixString(int length) { - byte[] bytes = getBytes(readState.optPostion, length); - readState.optPostion += length; + byte[] bytes = getBytes(readIndex, length); + readIndex += length; return new String(bytes); } @@ -212,10 +239,10 @@ public String getLenencString(int index) { } public String readLenencString() { - int strLen = (int) getLenencInt(readState.optPostion); + int strLen = (int) getLenencInt(readIndex); int lenencLen = getLenencLength(strLen); - byte[] bytes = getBytes(readState.optPostion + lenencLen, strLen); - this.readState.optPostion += strLen + lenencLen; + byte[] bytes = getBytes(readIndex + lenencLen, strLen); + readIndex += strLen + lenencLen; return new String(bytes); } @@ -230,7 +257,7 @@ public String readVarString(int length) { public String getNULString(int index) { int strLength = 0; int scanIndex = index; - while (scanIndex < readState.optLimit) { + while (scanIndex < writeIndex) { if (getByte(scanIndex++) == 0) { break; } @@ -241,8 +268,8 @@ public String getNULString(int index) { } public String readNULString() { - String rv = getNULString(readState.optPostion); - readState.optPostion += rv.getBytes().length + 1; + String rv = getNULString(readIndex); + readIndex += rv.getBytes().length + 1; return rv; } @@ -256,8 +283,8 @@ public ProxyBuffer putFixInt(int index, int length, long val) { } public ProxyBuffer writeFixInt(int length, long val) { - putFixInt(writeState.optPostion, length, val); - writeState.optPostion += length; + putFixInt(writeIndex, length, val); + writeIndex += length; return this; } @@ -279,19 +306,19 @@ public ProxyBuffer putLenencInt(int index, long val) { public ProxyBuffer writeLenencInt(long val) { if (val < 251) { - putByte(writeState.optPostion++, (byte) val); + putByte(writeIndex++, (byte) val); } else if (val >= 251 && val < (1 << 16)) { - putByte(writeState.optPostion++, (byte) 0xfc); - putFixInt(writeState.optPostion, 2, val); - writeState.optPostion += 2; + putByte(writeIndex++, (byte) 0xfc); + putFixInt(writeIndex, 2, val); + writeIndex += 2; } else if (val >= (1 << 16) && val < (1 << 24)) { - putByte(writeState.optPostion++, (byte) 0xfd); - putFixInt(writeState.optPostion, 3, val); - writeState.optPostion += 3; + putByte(writeIndex++, (byte) 0xfd); + putFixInt(writeIndex, 3, val); + writeIndex += 3; } else { - putByte(writeState.optPostion++, (byte) 0xfe); - putFixInt(writeState.optPostion, 8, val); - writeState.optPostion += 8; + putByte(writeIndex++, (byte) 0xfe); + putFixInt(writeIndex, 8, val); + writeIndex += 8; } return this; } @@ -302,8 +329,8 @@ public ProxyBuffer putFixString(int index, String val) { } public ProxyBuffer writeFixString(String val) { - putBytes(writeState.optPostion, val.getBytes()); - writeState.optPostion += val.getBytes().length; + putBytes(writeIndex, val.getBytes()); + writeIndex += val.getBytes().length; return this; } @@ -315,9 +342,9 @@ public ProxyBuffer putLenencString(int index, String val) { } public ProxyBuffer writeLenencString(String val) { - putLenencString(writeState.optPostion, val); + putLenencString(writeIndex, val); int lenencLen = getLenencLength(val.getBytes().length); - writeState.optPostion += lenencLen + val.getBytes().length; + writeIndex += lenencLen + val.getBytes().length; return this; } @@ -336,14 +363,12 @@ public ProxyBuffer putBytes(int index, byte[] bytes) { } public ProxyBuffer putBytes(int index, int length, byte[] bytes) { - buffer.limit(index + length); buffer.position(index); buffer.put(bytes); return this; } public ProxyBuffer putByte(int index, byte val) { - buffer.limit(index + 1); buffer.position(index); buffer.put(val); return this; @@ -356,40 +381,40 @@ public ProxyBuffer putNULString(int index, String val) { } public ProxyBuffer writeNULString(String val) { - putNULString(writeState.optPostion, val); - writeState.optPostion += val.getBytes().length + 1; + putNULString(writeIndex, val); + writeIndex += val.getBytes().length + 1; return this; } public byte[] readBytes(int length) { - byte[] bytes = this.getBytes(readState.optPostion, length); - readState.optPostion += length; + byte[] bytes = this.getBytes(readIndex, length); + readIndex += length; return bytes; } public ProxyBuffer writeBytes(int length, byte[] bytes) { - this.putBytes(writeState.optPostion, length, bytes); - writeState.optPostion += length; + this.putBytes(writeIndex, length, bytes); + writeIndex += length; return this; } public ProxyBuffer writeLenencBytes(byte[] bytes) { - putLenencInt(writeState.optPostion, bytes.length); + putLenencInt(writeIndex, bytes.length); int offset = getLenencLength(bytes.length); - putBytes(writeState.optPostion + offset, bytes); - writeState.optPostion += offset + bytes.length; + putBytes(writeIndex + offset, bytes); + writeIndex += offset + bytes.length; return this; } public ProxyBuffer writeByte(byte val) { - this.putByte(writeState.optPostion, val); - writeState.optPostion++; + this.putByte(writeIndex, val); + writeIndex++; return this; } public byte readByte() { - byte val = getByte(readState.optPostion); - readState.optPostion++; + byte val = getByte(readIndex); + readIndex++; return val; } @@ -431,9 +456,9 @@ public long getLenencInt(int index) { } public byte[] readLenencBytes() { - int len = (int) getLenencInt(readState.optPostion); - byte[] bytes = getBytes(readState.optPostion + getLenencLength(len), len); - readState.optPostion += getLenencLength(len) + len; + int len = (int) getLenencInt(readIndex); + byte[] bytes = getBytes(readIndex + getLenencLength(len), len); + readIndex += getLenencLength(len) + len; return bytes; } @@ -445,26 +470,22 @@ public ProxyBuffer putLenencBytes(int index, byte[] bytes) { } /** - * Reset to write状态,清除数据 + * 是否需要自动切换owner + * @return */ - public void reset() { - inReading = false; - writeState.optPostion = 0; - writeState.optLimit = buffer.capacity(); - writeState.curOptedLength = 0; - writeState.optedTotalLength = 0; - readState.optPostion = 0; - readState.optLimit = 0; - readState.curOptedLength = 0; - readState.optedTotalLength = 0; + public boolean needAutoChangeOwner() { + return (preUsing!=null&&frontUsing!=preUsing)?true:false; } - public boolean frontUsing() { - return this.frontUsing; - } - public boolean backendUsing() - { - return !frontUsing; + public ProxyBuffer setPreUsing(Boolean preUsing) { + this.preUsing = preUsing; + return this; } -} \ No newline at end of file + @Override + public String toString() { + return "ProxyBuffer [buffer=" + buffer + ", writeIndex=" + writeIndex + ", readIndex=" + readIndex + + ", readMark=" + readMark + ", inReading=" + inReading + ", frontUsing=" + frontUsing + ", preUsing=" + + preUsing + "]"; + } +} diff --git a/source/src/main/java/io/mycat/proxy/UserProxySession.java b/source/src/main/java/io/mycat/proxy/UserProxySession.java index 3a3c107..b3f93ec 100644 --- a/source/src/main/java/io/mycat/proxy/UserProxySession.java +++ b/source/src/main/java/io/mycat/proxy/UserProxySession.java @@ -36,21 +36,23 @@ public UserProxySession(BufferPool bufferPool, Selector selector, SocketChannel public boolean readFromChannel(ProxyBuffer proxyBuf, SocketChannel channel) throws IOException { ByteBuffer buffer = proxyBuf.getBuffer(); - buffer.limit(proxyBuf.writeState.optLimit); - buffer.position(proxyBuf.writeState.optPostion); + if (proxyBuf.writeIndex > buffer.capacity() * 1 / 3) { + proxyBuf.compact(); + }else{ + // buffer.position 在有半包没有参与透传时,会小于 writeIndex。 + // 大部分情况下 position == writeIndex + buffer.position(proxyBuf.writeIndex); + } + int readed = channel.read(buffer); - logger.debug(" readed {} total bytes ,channel {}", readed, channel); - proxyBuf.writeState.curOptedLength = readed; - if (readed > 0) { - proxyBuf.writeState.optPostion += readed; - proxyBuf.writeState.optedTotalLength += readed; - proxyBuf.readState.optLimit = proxyBuf.writeState.optPostion; - } else if (readed == -1) { + logger.debug(" readed {} total bytes ", readed); + if (readed == -1) { logger.warn("Read EOF ,socket closed "); throw new ClosedChannelException(); } else if (readed == 0) { logger.warn("readed zero bytes ,Maybe a bug ,please fix it !!!!"); } + proxyBuf.writeIndex = buffer.position(); return readed > 0; } @@ -60,33 +62,44 @@ public boolean readFromChannel(ProxyBuffer proxyBuf, SocketChannel channel) thro * @param channel */ public void writeToChannel(ProxyBuffer proxyBuf, SocketChannel channel) throws IOException { + ByteBuffer buffer = proxyBuf.getBuffer(); - BufferOptState readState = proxyBuf.readState; - BufferOptState writeState = proxyBuf.writeState; - buffer.position(readState.optPostion); - buffer.limit(readState.optLimit); + buffer.limit(proxyBuf.readIndex); + buffer.position(proxyBuf.readMark); int writed = channel.write(buffer); - readState.curOptedLength = writed; - readState.optPostion += writed; - readState.optedTotalLength += writed; - if (buffer.remaining() == 0) { - if (writeState.optPostion > buffer.position()) { - // 当前Buffer中写入的数据多于透传出去的数据,因此透传并未完成 - // compact buffer to head - buffer.limit(writeState.optPostion); - buffer.compact(); - readState.optPostion = 0; - readState.optLimit = buffer.position(); - writeState.optPostion = buffer.position(); - // 继续从对端Socket读数据 - - } else { - // 数据彻底写完,切换为读模式,对端读取数据 - proxyBuf.changeOwner(!proxyBuf.frontUsing()); - proxyBuf.flip(); - modifySelectKey(); + proxyBuf.readMark += writed; //记录本次磁轭如到 Channel 中的数据 + if(!buffer.hasRemaining()){ + logger.debug("writeToChannel write {} bytes ",writed); + // buffer 中需要透传的数据全部写入到 channel中后,会进入到当前分支.这时 readIndex == readLimit + if(proxyBuf.readMark != proxyBuf.readIndex){ + logger.error("writeToChannel has finished but readIndex != readLimit, please fix it !!!"); + } + if (proxyBuf.readIndex > buffer.capacity() * 2 / 3) { + proxyBuf.compact(); + }else{ + buffer.limit(buffer.capacity()); + } + //切换读写状态 + proxyBuf.flip(); + /* + * 如果需要自动切换owner,进行切换 + * 1. writed==0 或者 buffer 中数据没有写完时,注册可写事件 时,会进行owner 切换 注册写事件,完成后,需要自动切换回来 + */ + if(proxyBuf.needAutoChangeOwner()){ + proxyBuf.changeOwner(!proxyBuf.frontUsing()).setPreUsing(null); } + }else{ + /** + * 1. writed==0 或者 buffer 中数据没有写完时,注册可写事件 + * 通常发生在网络阻塞或者 客户端 COM_STMT_FETCH 命令可能会 出现没有写完或者 writed == 0 的情况 + */ + logger.debug("register OP_WRITE selectkey .write {} bytes. current channel is {}",writed,channel); + //需要切换 owner ,同时保存当前 owner 用于数据传输完成后,再切换回来 + // proxyBuf 读写状态不切换,会切换到相同的事件,不会重复注册 + proxyBuf.setPreUsing(proxyBuf.frontUsing()) + .changeOwner(!proxyBuf.frontUsing()); } + modifySelectKey(); } /** @@ -159,16 +172,27 @@ public void closeSocket(SocketChannel channel, boolean normal, String msg) { ((BackendIOHandler) getCurNIOHandler()).onBackendSocketClosed(this, normal); backendChannel = null; } - } public void modifySelectKey() throws ClosedChannelException { - if (frontKey != null && frontKey.isValid()) { - int clientOps = SelectionKey.OP_READ; + SelectionKey theKey = this.frontBuffer.frontUsing() ? frontKey : backendKey; + int clientOps = SelectionKey.OP_READ; + if (theKey != null && theKey.isValid()) { if (frontBuffer.isInWriting() == false) { clientOps = SelectionKey.OP_WRITE; } - frontKey.interestOps(clientOps); + int oldOps = theKey.interestOps(); + if (oldOps != clientOps) { + theKey.interestOps(clientOps); + } + } + logger.info(" current selectkey is {},channel is {}",theKey.interestOps(),theKey.channel()); + //取消对端 读写事件 + SelectionKey otherKey = this.frontBuffer.frontUsing() ? backendKey : frontKey; + if(otherKey!=null&&otherKey.isValid()){ + otherKey.interestOps(otherKey.interestOps() & ~(SelectionKey.OP_WRITE | SelectionKey.OP_READ)); + logger.info(" other selectkey is {},channel is {}",otherKey.interestOps(),otherKey.channel()); } + } } diff --git a/source/src/main/resources/datasource.xml b/source/src/main/resources/datasource.xml index 6198726..c1d1079 100644 --- a/source/src/main/resources/datasource.xml +++ b/source/src/main/resources/datasource.xml @@ -2,7 +2,7 @@ - +