Skip to content

Commit

Permalink
Merge pull request MyCATApache#26 from yanjunli/master
Browse files Browse the repository at this point in the history
重构proxyBuffer
  • Loading branch information
apachemycat authored Aug 21, 2017
2 parents a06e0cb + c4078ae commit 80d7676
Show file tree
Hide file tree
Showing 14 changed files with 438 additions and 270 deletions.
71 changes: 43 additions & 28 deletions source/src/main/java/io/mycat/mycat2/MySQLSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
import java.nio.ByteBuffer;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Map;

import io.mycat.mycat2.beans.*;
import io.mycat.mysql.AutoCommit;
import io.mycat.mysql.Capabilities;
import io.mycat.mysql.Isolation;
import io.mycat.mysql.packet.HandshakePacket;
import io.mycat.mysql.packet.MySQLPacket;
import io.mycat.proxy.BufferOptState;
import io.mycat.proxy.BufferPool;
import io.mycat.proxy.ProxyBuffer;
import io.mycat.proxy.ProxyRuntime;
Expand Down Expand Up @@ -50,6 +51,12 @@ public class MySQLSession extends UserProxySession {
* 事务隔离级别
*/
public Isolation isolation = Isolation.REPEATED_READ;

//当前接收到的包类型
public enum CurrPacketType{
Full,LongHalfPacket,ShortHalfPacket
}


/**
* 事务提交方式
Expand Down Expand Up @@ -98,11 +105,13 @@ public void responseOKOrError(MySQLPacket pkg, boolean front) throws IOException
frontBuffer.changeOwner(true);
pkg.write(this.frontBuffer);
frontBuffer.flip();
frontBuffer.readIndex=frontBuffer.writeIndex;
this.writeToChannel(frontBuffer, this.frontChannel);
} else {
frontBuffer.changeOwner(false);
pkg.write(this.frontBuffer);
frontBuffer.flip();
frontBuffer.readIndex=frontBuffer.writeIndex;
this.writeToChannel(frontBuffer, this.backendChannel);
}
}
Expand Down Expand Up @@ -135,8 +144,9 @@ public void sendAuthPackge() throws IOException {
hs.serverStatus = 2;
hs.restOfScrambleBuff = rand2;
hs.write(this.frontBuffer);
// 进行读取状态的切换,即将写状态切换 为读取状态
//设置frontBuffer 为读取状态
frontBuffer.flip();
frontBuffer.readIndex = frontBuffer.writeIndex;
this.writeToChannel(frontBuffer, this.frontChannel);
}

Expand All @@ -154,6 +164,7 @@ public MySQLSession(BufferPool bufPool, Selector nioSelector, SocketChannel fron
public void answerFront(byte[] rawPkg) throws IOException {
frontBuffer.writeBytes(rawPkg);
frontBuffer.flip();
frontBuffer.readIndex = frontBuffer.writeIndex;
writeToChannel(frontBuffer, frontChannel);
}

Expand All @@ -165,41 +176,48 @@ public void answerFront(byte[] rawPkg) throws IOException {
* @return
* @throws IOException
*/
public boolean resolveMySQLPackage(ProxyBuffer proxyBuf, MySQLPackageInf curPackInf, boolean markReaded)
public CurrPacketType resolveMySQLPackage(ProxyBuffer proxyBuf, MySQLPackageInf curPackInf, boolean markReaded)
throws IOException {
boolean readWholePkg = false;

ByteBuffer buffer = proxyBuf.getBuffer();
BufferOptState readState = proxyBuf.readState;
// 读取的偏移位置
int offset = readState.optPostion;
int offset = proxyBuf.readIndex;
// 读取的总长度
int limit = readState.optLimit;
int limit = proxyBuf.writeIndex;
// 读取当前的总长度
int totalLen = limit - offset;
if (totalLen == 0) {
return false;
if (totalLen == 0) { //透传情况下. 如果最后一个报文正好在buffer 最后位置,已经透传出去了.这里可能不会为零
return CurrPacketType.ShortHalfPacket;
}

if(curPackInf.remainsBytes==0&&curPackInf.crossBuffer){
curPackInf.crossBuffer = false;
}

// 如果当前跨多个报文
if (curPackInf.crossBuffer) {
if (curPackInf.remainsBytes <= totalLen) {
// 剩余报文结束
curPackInf.endPos = offset + curPackInf.remainsBytes;
offset += curPackInf.remainsBytes; //继续处理下一个报文
proxyBuf.readIndex = offset;
curPackInf.remainsBytes = 0;
readWholePkg = true;
} else {// 剩余报文还没读完,等待下一次读取
curPackInf.startPos = 0;
curPackInf.remainsBytes -= totalLen;
curPackInf.endPos = limit;
readWholePkg = false;
proxyBuf.readIndex = curPackInf.endPos;
return CurrPacketType.LongHalfPacket;
}
}
// 验证当前指针位置是否
else if (!ParseUtil.validateHeader(offset, limit)) {
//验证当前指针位置是否
if (!ParseUtil.validateHeader(offset, limit)) {
//收到短半包
logger.debug("not read a whole packet ,session {},offset {} ,limit {}", getSessionId(), offset, limit);
readWholePkg = false;
return CurrPacketType.ShortHalfPacket;
}

// 解包获取包的数据长度
//解包获取包的数据长度
int pkgLength = ParseUtil.getPacketLength(buffer, offset);
// 解析报文类型
final byte packetType = buffer.get(offset + ParseUtil.msyql_packetHeaderSize);
Expand All @@ -209,19 +227,18 @@ else if (!ParseUtil.validateHeader(offset, limit)) {
curPackInf.pkgLength = pkgLength;
// 设置偏移位置
curPackInf.startPos = offset;
// 设置跨buffer为false

curPackInf.crossBuffer = false;

curPackInf.remainsBytes = 0;
// 如果当前需要跨buffer处理
if ((offset + pkgLength) > limit) {
logger.debug(
"Not a whole packet: required length = {} bytes, cur total length = {} bytes, "
"Not a whole packet: required length = {} bytes, cur total length = {} bytes, limit ={}, "
+ "ready to handle the next read event",
getSessionId(), buffer.hashCode(), pkgLength, limit);
curPackInf.crossBuffer = true;
curPackInf.remainsBytes = offset + pkgLength - limit;
pkgLength, (limit-offset),limit);
curPackInf.endPos = limit;
readWholePkg = false;
return CurrPacketType.LongHalfPacket;
} else {
// 读到完整报文
curPackInf.endPos = curPackInf.pkgLength + curPackInf.startPos;
Expand All @@ -234,13 +251,11 @@ else if (!ParseUtil.validateHeader(offset, limit)) {
" session {} packet: startPos={}, offset = {}, length = {}, type = {}, cur total length = {},pkg HEX\r\n {}",
getSessionId(), curPackInf.startPos, offset, pkgLength, packetType, limit, hexs);
}
readWholePkg = true;
}
if (markReaded) {
readState.optPostion = curPackInf.endPos;
if (markReaded) {
proxyBuf.readIndex = curPackInf.endPos;
}
return CurrPacketType.Full;
}
return readWholePkg;

}

public void close(boolean normal, String hint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ public MySQLSession createSession(Object keyAttachment, BufferPool bufPool, Sele
session.setCurNIOHandler(MySQLClientAuthHandler.INSTANCE);
// 默认为透传命令模式
session.curSQLCommand = DirectPassthrouhCmd.INSTANCE;

// 设置 前端拥有
session.frontBuffer.changeOwner(true);
// 向MySQL Client发送认证报文
session.sendAuthPackge();
session.setSessionManager(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*
*/
public class MySQLPackageInf {
public int pkgType;
public byte pkgType;
public boolean crossBuffer;
public int startPos;
public int endPos;
Expand All @@ -14,4 +14,9 @@ public class MySQLPackageInf {
* 还有多少字节才结束,仅对跨多个Buffer的MySQL报文有意义(crossBuffer=true)
*/
public int remainsBytes;
@Override
public String toString() {
return "MySQLPackageInf [pkgType=" + pkgType + ", crossBuffer=" + crossBuffer + ", startPos=" + startPos
+ ", endPos=" + endPos + ", pkgLength=" + pkgLength + ", remainsBytes=" + remainsBytes + "]";
}
}
138 changes: 126 additions & 12 deletions source/src/main/java/io/mycat/mycat2/cmds/DirectPassthrouhCmd.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,17 @@

import java.io.IOException;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.mycat.mycat2.MySQLSession;
import io.mycat.mycat2.SQLCommand;
import io.mycat.mycat2.beans.MySQLPackageInf;
import io.mycat.mysql.packet.MySQLPacket;
import io.mycat.proxy.ProxyBuffer;

/**
Expand All @@ -15,31 +22,138 @@
*
*/
public class DirectPassthrouhCmd implements SQLCommand {

public static final DirectPassthrouhCmd INSTANCE=new DirectPassthrouhCmd();

private static final Logger logger = LoggerFactory.getLogger(DirectPassthrouhCmd.class);

public static final DirectPassthrouhCmd INSTANCE = new DirectPassthrouhCmd();

//********** 临时处理,等待与KK 代码合并
private static final Map<Byte,Integer> finishPackage = new HashMap<>();

private Map<Byte,Integer> curfinishPackage = new HashMap<>();

static{
finishPackage.put(MySQLPacket.OK_PACKET, 1);
finishPackage.put(MySQLPacket.ERROR_PACKET, 1);
finishPackage.put(MySQLPacket.EOF_PACKET, 2);
}
//********** 临时处理,等待与KK 代码合并

@Override
public boolean procssSQL(MySQLSession session, boolean backresReceived) throws IOException {

if(backresReceived){
return backendProcessor(session,session.curFrontMSQLPackgInf,session.frontChannel,session.frontBuffer);
}else{
return frontProcessor(session,backresReceived);
}
}

/**
* 前端报文处理
* @param session
* @return
* @throws IOException
*/
private boolean frontProcessor(MySQLSession session, boolean backresReceived) throws IOException{
curfinishPackage.putAll(finishPackage);
ProxyBuffer curBuffer = session.frontBuffer;
SocketChannel curChannel = session.backendChannel;
if (backresReceived) {// 收到后端发来的报文

curBuffer = session.frontBuffer;
curChannel = session.frontChannel;
// 切换 buffer 读写状态
curBuffer.flip();
// 改变 owner
curBuffer.changeOwner(false);
// 没有读取,直接透传时,需要指定 透传的数据 截止位置
curBuffer.readIndex = curBuffer.writeIndex;
//当前是前端报文处理器,如果是后端报文处理器调用,不切换Owner
session.writeToChannel(curBuffer, curChannel);
return false;
}

/**
* 后端报文处理
* @param session
* @return
* @throws IOException
*/
private boolean backendProcessor(MySQLSession session,MySQLPackageInf curMSQLPackgInf,
SocketChannel curChannel,ProxyBuffer curBuffer)throws IOException{

if(!session.readFromChannel(session.frontBuffer, session.backendChannel)){
return false;
}

// 直接透传报文
curBuffer.changeOwner(!curBuffer.frontUsing());
boolean isallfinish = false;
boolean isContinue = true;
while(isContinue){
switch(session.resolveMySQLPackage(curBuffer, curMSQLPackgInf,true)){
case Full:
Integer count = curfinishPackage.get(curMSQLPackgInf.pkgType);
if(count!=null){
if(--count==0){
isallfinish = true;
curfinishPackage.clear();
}
curfinishPackage.put(curMSQLPackgInf.pkgType, count);
}
if(curBuffer.readIndex == curBuffer.writeIndex){
isContinue = false;
}else{
isContinue = true;
}
break;
case LongHalfPacket:
if(curMSQLPackgInf.crossBuffer){
//发生过透传的半包,往往包的长度超过了buffer 的长度.
logger.debug(" readed crossBuffer LongHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf);
}else if(!isfinishPackage(curMSQLPackgInf)){
//不需要整包解析的长半包透传. result set .这种半包直接透传
curMSQLPackgInf.crossBuffer=true;
curBuffer.readIndex = curMSQLPackgInf.endPos;
curMSQLPackgInf.remainsBytes = curMSQLPackgInf.pkgLength-(curMSQLPackgInf.endPos - curMSQLPackgInf.startPos);
logger.debug(" readed LongHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf);
logger.debug(" curBuffer {}", curBuffer);
}else{
// 读取到了EOF/OK/ERROR 类型长半包 是需要保证是整包的.
logger.debug(" readed finished LongHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf);
// TODO 保证整包的机制
}
isContinue = false;
break;
case ShortHalfPacket:
logger.debug(" readed ShortHalfPacket ,curMSQLPackgInf is {}", curMSQLPackgInf);
isContinue = false;
break;
}
};

//切换buffer 读写状态
curBuffer.flip();
if(isallfinish){
curBuffer.changeOwner(true);
}
// 直接透传报文
session.writeToChannel(curBuffer, curChannel);
session.modifySelectKey();
/**
* 当前命令处理是否全部结束,全部结束时需要清理资源
*/
return false;
}

private boolean isfinishPackage(MySQLPackageInf curMSQLPackgInf)throws IOException{
switch(curMSQLPackgInf.pkgType){
case MySQLPacket.OK_PACKET:
case MySQLPacket.ERROR_PACKET:
case MySQLPacket.EOF_PACKET:
return true;
default:
return false;
}
}

@Override
public void clearResouces(boolean sessionCLosed) {
// nothint to do

// TODO Auto-generated method stub
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public void commandProc(MySQLSession session) throws IOException {

byte[] sql = session.frontBuffer.getBytes(
session.curFrontMSQLPackgInf.startPos + MySQLPacket.packetHeaderSize + 1,
session.curFrontMSQLPackgInf.endPos - MySQLPacket.packetHeaderSize - 1);
session.curFrontMSQLPackgInf.pkgLength - MySQLPacket.packetHeaderSize - 1);
sqlParser.parse(sql, sqlContext);
if (sqlContext.hasAnnotation()) {
// 此处添加注解处理
Expand Down
Loading

0 comments on commit 80d7676

Please sign in to comment.