diff --git a/source/src/main/java/io/mycat/mycat2/MySQLSession.java b/source/src/main/java/io/mycat/mycat2/MySQLSession.java index 8702984..bd91703 100644 --- a/source/src/main/java/io/mycat/mycat2/MySQLSession.java +++ b/source/src/main/java/io/mycat/mycat2/MySQLSession.java @@ -6,6 +6,7 @@ import java.nio.channels.SocketChannel; import io.mycat.mycat2.beans.*; +import io.mycat.mysql.AutoCommit; import io.mycat.mysql.Capabilities; import io.mycat.mysql.Isolation; import io.mycat.mysql.packet.HandshakePacket; @@ -50,6 +51,11 @@ public class MySQLSession extends UserProxySession { */ public Isolation isolation = Isolation.REPEATED_READ; + /** + * 事务提交方式 + */ + public AutoCommit autoCommit = AutoCommit.OFF; + /** * 认证中的seed报文数据 */ @@ -82,7 +88,7 @@ protected int getServerCapabilities() { /** * 回应客户端(front或Sever)OK 报文。 - * + * * @param pkg * ,必须要是OK报文或者Err报文 * @throws IOException @@ -103,7 +109,7 @@ public void responseOKOrError(MySQLPacket pkg, boolean front) throws IOException /** * 给客户端(front)发送认证报文 - * + * * @throws IOException */ public void sendAuthPackge() throws IOException { @@ -141,7 +147,7 @@ public MySQLSession(BufferPool bufPool, Selector nioSelector, SocketChannel fron /** * 向前端发送数据报文,需要先确定为Write状态并确保写入位置的正确(frontBuffer.writeState) - * + * * @param rawPkg * @throws IOException */ @@ -154,7 +160,7 @@ public void answerFront(byte[] rawPkg) throws IOException { /** * 解析MySQL报文,解析的结果存储在curMSQLPackgInf中,如果解析到完整的报文,就返回TRUE * 如果解析的过程中同时要移动ProxyBuffer的readState位置,即标记为读过,后继调用开始解析下一个报文,则需要参数markReaded=true - * + * * @param proxyBuf * @return * @throws IOException diff --git a/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java b/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java index 2f8893b..6ecb170 100644 --- a/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/DefaultMycatSessionHandler.java @@ -134,7 +134,7 @@ public void onBackendRead(MySQLSession session) throws IOException { ProxyBuffer backendBuffer = session.frontBuffer; - if (session.resolveMySQLPackage(backendBuffer, session.curFrontMSQLPackgInf, false) == false||!session.curFrontMSQLPackgInf.crossBuffer) { + if (session.resolveMySQLPackage(backendBuffer, session.curFrontMSQLPackgInf, false) == false && !session.curFrontMSQLPackgInf.crossBuffer) { // 没有读到完整报文, 也不是挎包 return; } diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java index a876e75..2ee0b9a 100644 --- a/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java @@ -11,81 +11,55 @@ import io.mycat.mysql.packet.QueryPacket; import io.mycat.proxy.ProxyBuffer; +import javax.management.Query; + /** * Created by ynfeng on 2017/8/13. *
* 同步状态至后端数据库,包括:字符集,事务,隔离级别等 */ public class BackendSynchronzationTask extends AbstractBackendIOTask { - private static Logger logger = LoggerFactory.getLogger(BackendSynchronzationTask.class); - private static QueryPacket[] CMDS = new QueryPacket[3]; - private int processCmd = 0; - - static { - QueryPacket isolationSynCmd = new QueryPacket(); - isolationSynCmd.packetId = 0; - - QueryPacket charsetSynCmd = new QueryPacket(); - charsetSynCmd.packetId = 0; - - QueryPacket transactionSynCmd = new QueryPacket(); - transactionSynCmd.packetId = 0; - - CMDS[0] = isolationSynCmd; - CMDS[1] = charsetSynCmd; - CMDS[2] = transactionSynCmd; - } - - public BackendSynchronzationTask(MySQLSession session) throws IOException { - super(session); - this.processCmd = 0; - syncState(session); - } - - private void syncState(MySQLSession session) throws IOException { - logger.info("synchronzation state to bakcend.session=" + session.toString()); - ProxyBuffer frontBuffer = session.frontBuffer; - frontBuffer.reset(); - // TODO 字符集映射和前端事务设置还未完成,这里只用隔离级别模拟实现(其实都是SET xxx效果一样),回头补充 - switch (processCmd) { - case 1: - case 2: - case 0: - CMDS[processCmd].sql = session.isolation.getCmd(); - CMDS[processCmd].write(frontBuffer); + private static Logger logger = LoggerFactory.getLogger(BackendSynchronzationTask.class); - frontBuffer.flip(); - session.writeToChannel(frontBuffer, session.backendChannel); - processCmd++; - break; - default: - this.finished(true); - break; - } + public BackendSynchronzationTask(MySQLSession session) throws IOException { + super(session); + syncState(session); + } - } + private void syncState(MySQLSession session) throws IOException { + logger.info("synchronzation state to bakcend.session=" + session.toString()); + ProxyBuffer frontBuffer = session.frontBuffer; + frontBuffer.reset(); + // TODO 字符集映射未完成 + QueryPacket queryPacket = new QueryPacket(); + queryPacket.packetId = 0; + queryPacket.sql = session.isolation.getCmd() + session.autoCommit.getCmd() + session.isolation.getCmd(); + queryPacket.write(frontBuffer); + frontBuffer.flip(); + session.writeToChannel(frontBuffer, session.backendChannel); + } - @Override - public void onBackendRead(MySQLSession session) throws IOException { - session.frontBuffer.reset(); - if (!session.readFromChannel(session.frontBuffer, session.backendChannel) - || !session.resolveMySQLPackage(session.frontBuffer, session.curBackendMSQLPackgInf, false)) {// 没有读到数据或者报文不完整 - return; - } - if (session.curBackendMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) { - syncState(session); - } else { - // TODO 同步失败如何处理??是否应该关闭此连接?? - errPkg = new ErrorPacket(); - errPkg.read(session.frontBuffer); - logger.warn("backend state sync Error.Err No. " + errPkg.errno + "," + errPkg.message); - this.finished(false); - } - } + @Override + public void onBackendRead(MySQLSession session) throws IOException { + session.frontBuffer.reset(); + if (!session.readFromChannel(session.frontBuffer, session.backendChannel) + || !session.resolveMySQLPackage(session.frontBuffer, session.curBackendMSQLPackgInf, false)) {// 没有读到数据或者报文不完整 + return; + } + if (session.curBackendMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) { + session.frontBuffer.reset(); + this.finished(true); + } else { + errPkg = new ErrorPacket(); + errPkg.read(session.frontBuffer); + logger.warn("backend state sync Error.Err No. " + errPkg.errno + "," + errPkg.message); + this.finished(false); + } + } - @Override - public void onBackendSocketClosed(MySQLSession userSession, boolean normal) { - logger.warn(" socket closed not handlerd" + session.toString()); - } + @Override + public void onBackendSocketClosed(MySQLSession userSession, boolean normal) { + logger.warn(" socket closed not handlerd" + session.toString()); + } } diff --git a/source/src/main/java/io/mycat/mysql/AutoCommit.java b/source/src/main/java/io/mycat/mysql/AutoCommit.java new file mode 100644 index 0000000..c9d1b1f --- /dev/null +++ b/source/src/main/java/io/mycat/mysql/AutoCommit.java @@ -0,0 +1,19 @@ +package io.mycat.mysql; + +/** + * Created by ynfeng on 2017/8/19. + */ +public enum AutoCommit { + ON("SET autocommit = 1;"), + OFF("SET autocommit = 0;"); + + AutoCommit(String cmd) { + this.cmd = cmd; + } + + private String cmd; + + public String getCmd() { + return cmd; + } +} diff --git a/source/src/main/java/io/mycat/mysql/Isolation.java b/source/src/main/java/io/mycat/mysql/Isolation.java index b382f4b..277063f 100644 --- a/source/src/main/java/io/mycat/mysql/Isolation.java +++ b/source/src/main/java/io/mycat/mysql/Isolation.java @@ -44,9 +44,5 @@ public enum Isolation { public String getCmd() { return cmd; } - - public void setCmd(String cmd) { - this.cmd = cmd; - } }