Skip to content

Commit

Permalink
Merge pull request MyCATApache#1 from MyCATApache/master
Browse files Browse the repository at this point in the history
合拼主分支
  • Loading branch information
junwen12221 authored Aug 22, 2017
2 parents f48f67f + 80d7676 commit a1fe619
Show file tree
Hide file tree
Showing 22 changed files with 729 additions and 429 deletions.
29 changes: 20 additions & 9 deletions source/src/main/java/io/mycat/mycat2/ConfigLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,13 +135,24 @@ public static List<MySQLRepBean> loadMySQLRepBean(String datasourceuri){
repBean.setSwitchType(switchType);
List<Node> mysqlNodes=getChildNodes(curRepNode,"mysql");
List<MySQLBean> allMysqls=mysqlNodes.stream().map(mysqlNode->{
NamedNodeMap attrs = mysqlNode.getAttributes();
String ip=getAttribute(attrs,"ip",null);
String user=getAttribute(attrs,"user",null);
String password=getAttribute(attrs,"password",null);
int port=getIntAttribute(attrs,"port",3306);
MySQLBean mysql=new MySQLBean(ip,port,user,password);
return mysql;}).collect(Collectors.toList()) ;
NamedNodeMap attrs = mysqlNode.getAttributes();
String hostName = getAttribute(attrs, "hostname", null);
String ip=getAttribute(attrs,"ip",null);
String user=getAttribute(attrs,"user",null);
String password=getAttribute(attrs,"password",null);
int port=getIntAttribute(attrs,"port",3306);
Integer minCon = getIntAttribute(attrs, "min-con", null);
Integer maxCon = getIntAttribute(attrs, "max-con", null);

MySQLBean mysql=new MySQLBean(ip,port,user,password);
if (hostName != null)
mysql.setHostName(hostName);
if (minCon != null)
mysql.setMinCon(minCon);
if (maxCon != null)
mysql.setMaxCon(maxCon);
return mysql;
}).collect(Collectors.toList()) ;
repBean.setMysqls(allMysqls);
list.add(repBean);
}
Expand All @@ -161,15 +172,15 @@ private static String getAttribute(NamedNodeMap map,String attr,String defaultVa
{
return getValue(map.getNamedItem(attr),defaultVal);
}
private static int getIntAttribute(NamedNodeMap map,String attr,int defaultVal)
private static Integer getIntAttribute(NamedNodeMap map,String attr,Integer defaultVal)
{
return getIntValue(map.getNamedItem(attr),defaultVal);
}
private static String getValue(Node node,String defaultVal)
{
return node==null?defaultVal:node.getNodeValue();
}
private static int getIntValue(Node node,int defaultVal)
private static Integer getIntValue(Node node,Integer defaultVal)
{
return node==null?defaultVal:Integer.valueOf(node.getNodeValue());
}
Expand Down
85 changes: 53 additions & 32 deletions source/src/main/java/io/mycat/mycat2/MySQLSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;

import io.mycat.mycat2.beans.*;
import io.mycat.mysql.AutoCommit;
import io.mycat.mysql.Capabilities;
import io.mycat.mysql.Isolation;
import io.mycat.mysql.packet.HandshakePacket;
import io.mycat.mysql.packet.MySQLPacket;
import io.mycat.proxy.BufferOptState;
import io.mycat.proxy.BufferPool;
import io.mycat.proxy.ProxyBuffer;
import io.mycat.proxy.ProxyRuntime;
Expand Down Expand Up @@ -49,6 +51,17 @@ public class MySQLSession extends UserProxySession {
* 事务隔离级别
*/
public Isolation isolation = Isolation.REPEATED_READ;

//当前接收到的包类型
public enum CurrPacketType{
Full,LongHalfPacket,ShortHalfPacket
}


/**
* 事务提交方式
*/
public AutoCommit autoCommit = AutoCommit.OFF;

/**
* 认证中的seed报文数据
Expand Down Expand Up @@ -82,7 +95,7 @@ protected int getServerCapabilities() {

/**
* 回应客户端(front或Sever)OK 报文。
*
*
* @param pkg
* ,必须要是OK报文或者Err报文
* @throws IOException
Expand All @@ -92,18 +105,20 @@ public void responseOKOrError(MySQLPacket pkg, boolean front) throws IOException
frontBuffer.changeOwner(true);
pkg.write(this.frontBuffer);
frontBuffer.flip();
frontBuffer.readIndex=frontBuffer.writeIndex;
this.writeToChannel(frontBuffer, this.frontChannel);
} else {
frontBuffer.changeOwner(false);
pkg.write(this.frontBuffer);
frontBuffer.flip();
frontBuffer.readIndex=frontBuffer.writeIndex;
this.writeToChannel(frontBuffer, this.backendChannel);
}
}

/**
* 给客户端(front)发送认证报文
*
*
* @throws IOException
*/
public void sendAuthPackge() throws IOException {
Expand All @@ -129,8 +144,9 @@ public void sendAuthPackge() throws IOException {
hs.serverStatus = 2;
hs.restOfScrambleBuff = rand2;
hs.write(this.frontBuffer);
// 进行读取状态的切换,即将写状态切换 为读取状态
//设置frontBuffer 为读取状态
frontBuffer.flip();
frontBuffer.readIndex = frontBuffer.writeIndex;
this.writeToChannel(frontBuffer, this.frontChannel);
}

Expand All @@ -141,59 +157,67 @@ public MySQLSession(BufferPool bufPool, Selector nioSelector, SocketChannel fron

/**
* 向前端发送数据报文,需要先确定为Write状态并确保写入位置的正确(frontBuffer.writeState)
*
*
* @param rawPkg
* @throws IOException
*/
public void answerFront(byte[] rawPkg) throws IOException {
frontBuffer.writeBytes(rawPkg);
frontBuffer.flip();
frontBuffer.readIndex = frontBuffer.writeIndex;
writeToChannel(frontBuffer, frontChannel);
}

/**
* 解析MySQL报文,解析的结果存储在curMSQLPackgInf中,如果解析到完整的报文,就返回TRUE
* 如果解析的过程中同时要移动ProxyBuffer的readState位置,即标记为读过,后继调用开始解析下一个报文,则需要参数markReaded=true
*
*
* @param proxyBuf
* @return
* @throws IOException
*/
public boolean resolveMySQLPackage(ProxyBuffer proxyBuf, MySQLPackageInf curPackInf, boolean markReaded)
public CurrPacketType resolveMySQLPackage(ProxyBuffer proxyBuf, MySQLPackageInf curPackInf, boolean markReaded)
throws IOException {
boolean readWholePkg = false;

ByteBuffer buffer = proxyBuf.getBuffer();
BufferOptState readState = proxyBuf.readState;
// 读取的偏移位置
int offset = readState.optPostion;
int offset = proxyBuf.readIndex;
// 读取的总长度
int limit = readState.optLimit;
int limit = proxyBuf.writeIndex;
// 读取当前的总长度
int totalLen = limit - offset;
if (totalLen == 0) {
return false;
if (totalLen == 0) { //透传情况下. 如果最后一个报文正好在buffer 最后位置,已经透传出去了.这里可能不会为零
return CurrPacketType.ShortHalfPacket;
}

if(curPackInf.remainsBytes==0&&curPackInf.crossBuffer){
curPackInf.crossBuffer = false;
}

// 如果当前跨多个报文
if (curPackInf.crossBuffer) {
if (curPackInf.remainsBytes <= totalLen) {
// 剩余报文结束
curPackInf.endPos = offset + curPackInf.remainsBytes;
offset += curPackInf.remainsBytes; //继续处理下一个报文
proxyBuf.readIndex = offset;
curPackInf.remainsBytes = 0;
readWholePkg = true;
} else {// 剩余报文还没读完,等待下一次读取
curPackInf.startPos = 0;
curPackInf.remainsBytes -= totalLen;
curPackInf.endPos = limit;
readWholePkg = false;
proxyBuf.readIndex = curPackInf.endPos;
return CurrPacketType.LongHalfPacket;
}
}
// 验证当前指针位置是否
else if (!ParseUtil.validateHeader(offset, limit)) {
//验证当前指针位置是否
if (!ParseUtil.validateHeader(offset, limit)) {
//收到短半包
logger.debug("not read a whole packet ,session {},offset {} ,limit {}", getSessionId(), offset, limit);
readWholePkg = false;
return CurrPacketType.ShortHalfPacket;
}

// 解包获取包的数据长度
//解包获取包的数据长度
int pkgLength = ParseUtil.getPacketLength(buffer, offset);
// 解析报文类型
final byte packetType = buffer.get(offset + ParseUtil.msyql_packetHeaderSize);
Expand All @@ -203,19 +227,18 @@ else if (!ParseUtil.validateHeader(offset, limit)) {
curPackInf.pkgLength = pkgLength;
// 设置偏移位置
curPackInf.startPos = offset;
// 设置跨buffer为false

curPackInf.crossBuffer = false;

curPackInf.remainsBytes = 0;
// 如果当前需要跨buffer处理
if ((offset + pkgLength) > limit) {
logger.debug(
"Not a whole packet: required length = {} bytes, cur total length = {} bytes, "
"Not a whole packet: required length = {} bytes, cur total length = {} bytes, limit ={}, "
+ "ready to handle the next read event",
getSessionId(), buffer.hashCode(), pkgLength, limit);
curPackInf.crossBuffer = true;
curPackInf.remainsBytes = offset + pkgLength - limit;
pkgLength, (limit-offset),limit);
curPackInf.endPos = limit;
readWholePkg = false;
return CurrPacketType.LongHalfPacket;
} else {
// 读到完整报文
curPackInf.endPos = curPackInf.pkgLength + curPackInf.startPos;
Expand All @@ -228,13 +251,11 @@ else if (!ParseUtil.validateHeader(offset, limit)) {
" session {} packet: startPos={}, offset = {}, length = {}, type = {}, cur total length = {},pkg HEX\r\n {}",
getSessionId(), curPackInf.startPos, offset, pkgLength, packetType, limit, hexs);
}
readWholePkg = true;
}
if (markReaded) {
readState.optPostion = curPackInf.endPos;
if (markReaded) {
proxyBuf.readIndex = curPackInf.endPos;
}
return CurrPacketType.Full;
}
return readWholePkg;

}

public void close(boolean normal, String hint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public MySQLSession createSession(Object keyAttachment, BufferPool bufPool, Sele
session.setCurNIOHandler(MySQLClientAuthHandler.INSTANCE);
// 默认为透传命令模式
session.curSQLCommand = DirectPassthrouhCmd.INSTANCE;

// 设置 前端拥有
session.frontBuffer.changeOwner(true);
// 向MySQL Client发送认证报文
session.sendAuthPackge();
session.setSessionManager(this);
Expand Down
11 changes: 1 addition & 10 deletions source/src/main/java/io/mycat/mycat2/beans/MySQLBean.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ public class MySQLBean {
private int port;
private String user;
private String password;
private String defaultSchema = "mysql";
private int maxCon = 1000;
private int minCon = 1;

Expand Down Expand Up @@ -89,14 +88,6 @@ public void setPassword(String password) {
this.password = password;
}

public String getDefaultSchema() {
return defaultSchema;
}

public void setDefaultSchema(String defaultSchema) {
this.defaultSchema = defaultSchema;
}

public int getMaxCon() {
return maxCon;
}
Expand All @@ -116,7 +107,7 @@ public void setMinCon(int minCon) {
@Override
public String toString() {
return "MySQLBean [hostName=" + hostName + ", ip=" + ip + ", port=" + port + ", user=" + user + ", password="
+ password + ", defaultSchema=" + defaultSchema + ", maxCon=" + maxCon + ", minCon=" + minCon + "]";
+ password + ", maxCon=" + maxCon + ", minCon=" + minCon + "]";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
*/
public class MySQLPackageInf {
public int pkgType;
public byte pkgType;
public boolean crossBuffer;
public int startPos;
public int endPos;
Expand All @@ -14,4 +14,9 @@ public class MySQLPackageInf {
* 还有多少字节才结束,仅对跨多个Buffer的MySQL报文有意义(crossBuffer=true)
*/
public int remainsBytes;
@Override
public String toString() {
return "MySQLPackageInf [pkgType=" + pkgType + ", crossBuffer=" + crossBuffer + ", startPos=" + startPos
+ ", endPos=" + endPos + ", pkgLength=" + pkgLength + ", remainsBytes=" + remainsBytes + "]";
}
}
Loading

0 comments on commit a1fe619

Please sign in to comment.