diff --git a/source/src/main/java/io/mycat/mycat2/MySQLSession.java b/source/src/main/java/io/mycat/mycat2/MySQLSession.java index b4ae625..9924463 100644 --- a/source/src/main/java/io/mycat/mycat2/MySQLSession.java +++ b/source/src/main/java/io/mycat/mycat2/MySQLSession.java @@ -120,8 +120,8 @@ public void sendAuthPackge() throws IOException { // 发送握手数据包 HandshakePacket hs = new HandshakePacket(); hs.packetId = 0; - hs.protocolVersion = Versions.PROTOCOL_VERSION; - hs.serverVersion = Versions.SERVER_VERSION; + hs.protocolVersion = Version.PROTOCOL_VERSION; + hs.serverVersion = Version.SERVER_VERSION; hs.threadId = this.getSessionId(); hs.seed = rand1; hs.serverCapabilities = getServerCapabilities(); @@ -151,44 +151,7 @@ public void answerFront(byte[] rawPkg) throws IOException { writeToChannel(frontBuffer, frontChannel); } - /** - * 从Socket中读取数据,通常在NIO事件中调用,比如onFrontRead/onBackendRead - * - * @param session - * @param readFront - * @return - * @throws IOException - */ - public boolean readSocket(boolean readFront) throws IOException { - //默认获得后端的buffer和前端的channel - ProxyBuffer buffer = backendBuffer; - SocketChannel channel = frontChannel; - - //如果标识当前为false,则表示当前为读取后端通道的数据 - if (!readFront) { - //则当前的为前端的buffer和后端的channel - buffer = frontBuffer; - channel = backendChannel; - } - //进行通道数据的读取 - int readed = readFromChannel(buffer, channel); - logger.debug("readed {} total bytes ", readed); - if (readed == -1) { - closeSocket(channel, true, "read EOF."); - return false; - } else if (readed == 0) { - logger.warn("read 0 bytes ,try compact buffer " + (readFront ? " front " : "backend ") + " ,session Id :" - + this.getSessionId()); - buffer.compact(true); - // todo curMSQLPackgInf - // 也许要对应的改变位置,如果curMSQLPackgInf是跨Package的,则可能无需改变信息 - // curPackInf. - return false; - } - //更新当前数据读取到的长度信息 - buffer.updateReadLimit(); - return true; - } + /** * 解析MySQL报文,解析的结果存储在curMSQLPackgInf中,如果解析到完整的报文,就返回TRUE diff --git a/source/src/main/java/io/mycat/mycat2/MycatCore.java b/source/src/main/java/io/mycat/mycat2/MycatCore.java index 94a9aa6..6fe83d6 100644 --- a/source/src/main/java/io/mycat/mycat2/MycatCore.java +++ b/source/src/main/java/io/mycat/mycat2/MycatCore.java @@ -40,9 +40,7 @@ import io.mycat.proxy.NIOAcceptor; import io.mycat.proxy.ProxyReactorThread; import io.mycat.proxy.ProxyRuntime; -import io.mycat.proxy.man.AdminCommandResovler; import io.mycat.proxy.man.ClusterNode; -import io.mycat.proxy.man.DefaultAdminSessionHandler; import io.mycat.proxy.man.DefaultAdminSessionManager; import io.mycat.proxy.man.MyCluster; diff --git a/source/src/main/java/io/mycat/mycat2/Versions.java b/source/src/main/java/io/mycat/mycat2/Versions.java deleted file mode 100644 index 9063f41..0000000 --- a/source/src/main/java/io/mycat/mycat2/Versions.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (c) 2016, OpenCloudDB/MyCAT and/or its affiliates. All rights reserved. - * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. - * - * This code is free software;Designed and Developed mainly by many Chinese - * opensource volunteers. you can redistribute it and/or modify it under the - * terms of the GNU General Public License version 2 only, as published by the - * Free Software Foundation. - * - * This code is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License - * version 2 for more details (a copy is included in the LICENSE file that - * accompanied this code). - * - * You should have received a copy of the GNU General Public License version - * 2 along with this work; if not, write to the Free Software Foundation, - * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. - * - * Any questions about this component can be directed to it's project Web address - * https://mycat.io/ - * - */ -package io.mycat.mycat2; - -/** - * @author mycat - */ -public class Versions { - - /**协议版本**/ - public static final byte PROTOCOL_VERSION = 10; - - /**服务器版**/ - public static final byte[] SERVER_VERSION = "Fake MySQL server".getBytes(); - -} \ No newline at end of file diff --git a/source/src/main/java/io/mycat/mycat2/cmds/LoadDataCmd.java b/source/src/main/java/io/mycat/mycat2/cmds/LoadDataCmd.java index 3c9746d..772c656 100644 --- a/source/src/main/java/io/mycat/mycat2/cmds/LoadDataCmd.java +++ b/source/src/main/java/io/mycat/mycat2/cmds/LoadDataCmd.java @@ -1,176 +1,176 @@ -package io.mycat.mycat2.cmds; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.channels.SocketChannel; -import java.util.Arrays; - -import io.mycat.mycat2.MySQLSession; -import io.mycat.mycat2.SQLCommand; -import io.mycat.proxy.ProxyBuffer; - -/** - * 进行load data的命令数据透传 - * - * @author wuzhihui - * - */ -public class LoadDataCmd implements SQLCommand { - - /** - * loaddata传送结束标识长度 - */ - private static final int FLAGLENGTH = 4; - - /** - * 结束flag标识 - */ - private byte[] overFlag = new byte[FLAGLENGTH]; - - /** - * 提供load data的命令处理 - */ - public static final LoadDataCmd INSTANCE = new LoadDataCmd(); - - @Override - public boolean procssSQL(MySQLSession session, boolean backresReceived) throws IOException { - - ProxyBuffer curBuffer = session.backendBuffer; - SocketChannel curChannel = session.backendChannel; - if (backresReceived) {// 收到后端发来的报文 - - curBuffer = session.frontBuffer; - curChannel = session.frontChannel; - } - - // 进行结束符的读取 - this.readOverByte(curBuffer); - - session.writeToChannel(curBuffer, curChannel); - - // 检查结束切换状态 - if (checkOver()) { - session.modifySelectKey(); - curBuffer.reset(); - return true; - } - - curBuffer.reset(); - - return false; - } - - /** - * 进行结束符的读取 - * - * @param curBuffer - * buffer数组信息 - */ - private void readOverByte(ProxyBuffer curBuffer) { - // 获取当前buffer的最后 - ByteBuffer buffer = curBuffer.getBuffer(); - - // 如果数据的长度超过了,结束符的长度,可直接提取结束符 - if (buffer.position() >= FLAGLENGTH) { - int opts = curBuffer.readState.optLimit; - buffer.position(opts - FLAGLENGTH); - buffer.get(overFlag, 0, FLAGLENGTH); - buffer.position(opts); - } - // 如果小于结束符,说明需要进行两个byte数组的合并 - else { - int opts = curBuffer.readState.optLimit; - // 计算放入的位置 - int moveSize = FLAGLENGTH - opts; - int index = 0; - // 进行数组的移动,以让出空间进行放入新的数据 - for (int i = FLAGLENGTH - moveSize; i < FLAGLENGTH; i++) { - overFlag[index] = overFlag[i]; - index++; - } - // 读取数据 - buffer.position(0); - buffer.get(overFlag, moveSize, opts); - buffer.position(opts); - } - - } - - /** - * 进行结束符的检查, - * - * 数据的结束符为0,0,0,包序,即可以验证读取到3个连续0,即为结束 - * - * @return - */ - private boolean checkOver() { - for (int i = 0; i < overFlag.length - 1; i++) { - if (overFlag[i] != 0) { - return false; - } - } - return true; - } - - /** - * 测试方法,验证组合读取 - * - * @param buffer - * @param overFlag - * @return - */ - public static byte[] TestReadByte(ByteBuffer buffer, byte[] overFlag) { - // byte[] overFlag = new byte[FLAGLENGTH]; - - if (buffer.position() >= FLAGLENGTH) { - int opts = buffer.position(); - buffer.position(opts - FLAGLENGTH); - buffer.get(overFlag, 0, FLAGLENGTH); - buffer.position(opts); - } else { - - int opts = buffer.position(); - // 计算需要移动的位数 - int moveSize = FLAGLENGTH - opts; - int index = 0; - // 进行数组的移动,以让出空间进行放入新的数据 - for (int i = FLAGLENGTH - moveSize; i < FLAGLENGTH; i++) { - overFlag[index] = overFlag[i]; - index++; - } - // 读取数据 - buffer.position(0); - buffer.get(overFlag, moveSize, opts); - buffer.position(opts); - } - - return overFlag; - } - - @Override - public void clearResouces(boolean sessionCLosed) { - - } - - public static void main(String[] args) { - - ByteBuffer buffer = ByteBuffer.allocateDirect(30); - byte[] overFlag = new byte[FLAGLENGTH]; - - buffer.put((byte) 1); - buffer.put((byte) 2); - buffer.put((byte) 3); - buffer.put((byte) 4); - - overFlag = TestReadByte(buffer, overFlag); - - ByteBuffer buffer2 = ByteBuffer.allocateDirect(1); - buffer2.put((byte) 5); - - overFlag = TestReadByte(buffer2, overFlag); - - System.out.println(Arrays.toString(overFlag)); - - } - -} +package io.mycat.mycat2.cmds; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.Arrays; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.SQLCommand; +import io.mycat.proxy.ProxyBuffer; + +/** + * 进行load data的命令数据透传 + * + * @author wuzhihui + * + */ +public class LoadDataCmd implements SQLCommand { + + /** + * loaddata传送结束标识长度 + */ + private static final int FLAGLENGTH = 4; + + /** + * 结束flag标识 + */ + private byte[] overFlag = new byte[FLAGLENGTH]; + + /** + * 提供load data的命令处理 + */ + public static final LoadDataCmd INSTANCE = new LoadDataCmd(); + + @Override + public boolean procssSQL(MySQLSession session, boolean backresReceived) throws IOException { + + ProxyBuffer curBuffer = session.backendBuffer; + SocketChannel curChannel = session.backendChannel; + if (backresReceived) {// 收到后端发来的报文 + + curBuffer = session.frontBuffer; + curChannel = session.frontChannel; + } + + // 进行结束符的读取 + this.readOverByte(curBuffer); + + session.writeToChannel(curBuffer, curChannel); + + // 检查结束切换状态 + if (checkOver()) { + session.modifySelectKey(); + curBuffer.reset(); + return true; + } + + curBuffer.reset(); + + return false; + } + + /** + * 进行结束符的读取 + * + * @param curBuffer + * buffer数组信息 + */ + private void readOverByte(ProxyBuffer curBuffer) { + // 获取当前buffer的最后 + ByteBuffer buffer = curBuffer.getBuffer(); + + // 如果数据的长度超过了,结束符的长度,可直接提取结束符 + if (buffer.position() >= FLAGLENGTH) { + int opts = curBuffer.readState.optLimit; + buffer.position(opts - FLAGLENGTH); + buffer.get(overFlag, 0, FLAGLENGTH); + buffer.position(opts); + } + // 如果小于结束符,说明需要进行两个byte数组的合并 + else { + int opts = curBuffer.readState.optLimit; + // 计算放入的位置 + int moveSize = FLAGLENGTH - opts; + int index = 0; + // 进行数组的移动,以让出空间进行放入新的数据 + for (int i = FLAGLENGTH - moveSize; i < FLAGLENGTH; i++) { + overFlag[index] = overFlag[i]; + index++; + } + // 读取数据 + buffer.position(0); + buffer.get(overFlag, moveSize, opts); + buffer.position(opts); + } + + } + + /** + * 进行结束符的检查, + * + * 数据的结束符为0,0,0,包序,即可以验证读取到3个连续0,即为结束 + * + * @return + */ + private boolean checkOver() { + for (int i = 0; i < overFlag.length - 1; i++) { + if (overFlag[i] != 0) { + return false; + } + } + return true; + } + + /** + * 测试方法,验证组合读取 + * + * @param buffer + * @param overFlag + * @return + */ + public static byte[] TestReadByte(ByteBuffer buffer, byte[] overFlag) { + // byte[] overFlag = new byte[FLAGLENGTH]; + + if (buffer.position() >= FLAGLENGTH) { + int opts = buffer.position(); + buffer.position(opts - FLAGLENGTH); + buffer.get(overFlag, 0, FLAGLENGTH); + buffer.position(opts); + } else { + + int opts = buffer.position(); + // 计算需要移动的位数 + int moveSize = FLAGLENGTH - opts; + int index = 0; + // 进行数组的移动,以让出空间进行放入新的数据 + for (int i = FLAGLENGTH - moveSize; i < FLAGLENGTH; i++) { + overFlag[index] = overFlag[i]; + index++; + } + // 读取数据 + buffer.position(0); + buffer.get(overFlag, moveSize, opts); + buffer.position(opts); + } + + return overFlag; + } + + @Override + public void clearResouces(boolean sessionCLosed) { + + } + + public static void main(String[] args) { + + ByteBuffer buffer = ByteBuffer.allocateDirect(30); + byte[] overFlag = new byte[FLAGLENGTH]; + + buffer.put((byte) 1); + buffer.put((byte) 2); + buffer.put((byte) 3); + buffer.put((byte) 4); + + overFlag = TestReadByte(buffer, overFlag); + + ByteBuffer buffer2 = ByteBuffer.allocateDirect(1); + buffer2.put((byte) 5); + + overFlag = TestReadByte(buffer2, overFlag); + + System.out.println(Arrays.toString(overFlag)); + + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/net/DefaultMySQLStudySessionHandler.java b/source/src/main/java/io/mycat/mycat2/net/DefaultMySQLStudySessionHandler.java index 567ad82..fdf8d40 100644 --- a/source/src/main/java/io/mycat/mycat2/net/DefaultMySQLStudySessionHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/DefaultMySQLStudySessionHandler.java @@ -19,7 +19,7 @@ public class DefaultMySQLStudySessionHandler extends DefaultDirectProxyHandler, @Override public void onFrontRead(final MySQLSession session) throws IOException { - boolean readed = session.readSocket(true); + boolean readed = session.readFromChannel(session.backendBuffer, session.frontChannel); ProxyBuffer backendBuffer = session.backendBuffer; if (readed == false) { return; @@ -105,7 +105,7 @@ private void syncSessionStateToBackend(MySQLSession mySQLSession) throws IOExcep } public void onBackendRead(MySQLSession session) throws IOException { - boolean readed = session.readSocket(false); + boolean readed = session.readFromChannel(session.frontBuffer, session.backendChannel); if (readed == false) { return; } diff --git a/source/src/main/java/io/mycat/mycat2/net/LoadDataHandler.java b/source/src/main/java/io/mycat/mycat2/net/LoadDataHandler.java index 2e2b65f..36b9640 100644 --- a/source/src/main/java/io/mycat/mycat2/net/LoadDataHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/LoadDataHandler.java @@ -1,45 +1,45 @@ -package io.mycat.mycat2.net; - -import java.io.IOException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.mycat.mycat2.MySQLSession; -import io.mycat.mycat2.cmds.DirectPassthrouhCmd; -import io.mycat.proxy.ProxyBuffer; - -/** - * 负责处理通用的SQL命令,默认情况下透传 - * - * @author wuzhihui - * - */ -public class LoadDataHandler extends DefaultMycatSessionHandler { - private static Logger logger = LoggerFactory.getLogger(LoadDataHandler.class); - - /** - * 工厂方法实例对象 - */ - public static final LoadDataHandler INSTANCE = new LoadDataHandler(); - - @Override - public void onFrontRead(final MySQLSession session) throws IOException { - boolean readed = session.readSocket(true); - ProxyBuffer backendBuffer = session.backendBuffer; - if (readed == false) { - return; - } - if (session.curFrontMSQLPackgInf.endPos < backendBuffer.getReadOptState().optLimit) { - logger.warn("front contains multi package "); - } - // 交给SQLComand去处理 - if (session.curSQLCommand.procssSQL(session, false)) { - session.curSQLCommand.clearResouces(false); - session.curSQLCommand = DirectPassthrouhCmd.INSTANCE; - session.setCurNIOHandler(DefaultMycatSessionHandler.INSTANCE); - } - - } - -} +package io.mycat.mycat2.net; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.mycat.mycat2.MySQLSession; +import io.mycat.mycat2.cmds.DirectPassthrouhCmd; +import io.mycat.proxy.ProxyBuffer; + +/** + * 负责处理通用的SQL命令,默认情况下透传 + * + * @author wuzhihui + * + */ +public class LoadDataHandler extends DefaultMycatSessionHandler { + private static Logger logger = LoggerFactory.getLogger(LoadDataHandler.class); + + /** + * 工厂方法实例对象 + */ + public static final LoadDataHandler INSTANCE = new LoadDataHandler(); + + @Override + public void onFrontRead(final MySQLSession session) throws IOException { + boolean readed = session.readFromChannel(session.backendBuffer,session.frontChannel); + ProxyBuffer backendBuffer = session.backendBuffer; + if (readed == false) { + return; + } + if (session.curFrontMSQLPackgInf.endPos < backendBuffer.getReadOptState().optLimit) { + logger.warn("front contains multi package "); + } + // 交给SQLComand去处理 + if (session.curSQLCommand.procssSQL(session, false)) { + session.curSQLCommand.clearResouces(false); + session.curSQLCommand = DirectPassthrouhCmd.INSTANCE; + session.setCurNIOHandler(DefaultMycatSessionHandler.INSTANCE); + } + + } + +} diff --git a/source/src/main/java/io/mycat/mycat2/net/MySQLClientAuthHandler.java b/source/src/main/java/io/mycat/mycat2/net/MySQLClientAuthHandler.java index f3e1cae..8ca05d5 100644 --- a/source/src/main/java/io/mycat/mycat2/net/MySQLClientAuthHandler.java +++ b/source/src/main/java/io/mycat/mycat2/net/MySQLClientAuthHandler.java @@ -1,14 +1,9 @@ package io.mycat.mycat2.net; import java.io.IOException; -import java.net.InetSocketAddress; -import java.nio.channels.SelectionKey; -import java.nio.channels.Selector; -import java.nio.channels.SocketChannel; import io.mycat.mycat2.MySQLSession; import io.mycat.mysql.packet.AuthPacket; -import io.mycat.proxy.BufferPool; import io.mycat.proxy.DefaultDirectProxyHandler; import io.mycat.proxy.ProxyBuffer; import io.mycat.util.CharsetUtil; @@ -24,30 +19,10 @@ public class MySQLClientAuthHandler extends DefaultDirectProxyHandler prevProxyHandler; + protected ErrorPacket errPkg; + + public AbstractBackendIOTask(MySQLSession session) { + prevNetMode = session.netOptMode; + session.netOptMode = UserProxySession.NetOptMode.BackendRW; + this.session = session; + prevProxyHandler = session.getCurNIOHandler(); + } + + protected void finished(boolean success) throws IOException { + sessionRecover(); + onFinished(success); + + callBack.finished(session, this, success, this.errPkg); + } + + protected void sessionRecover() { + // 恢复Session原来的状态 + session.netOptMode = prevNetMode; + session.setCurNIOHandler(prevProxyHandler); + } + + protected void onFinished(boolean success) { + + } + + @Override + public void onBackendConnect(MySQLSession userSession, boolean success, String msg) throws IOException { + + } + + @Override + public void onBackendWrite(MySQLSession session) throws IOException { + + } + + @Override + public void setCallback(AsynTaskCallBack callBack) { + this.callBack = callBack; + + } + + + +} diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java index daeb67c..98803b0 100644 --- a/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendConCreateTask.java @@ -15,10 +15,6 @@ import io.mycat.mysql.packet.ErrorPacket; import io.mycat.mysql.packet.HandshakePacket; import io.mycat.mysql.packet.MySQLPacket; -import io.mycat.proxy.NIOHandler; -import io.mycat.proxy.ProxyBuffer; -import io.mycat.proxy.UserProxySession; -import io.mycat.proxy.UserProxySession.NetOptMode; import io.mycat.util.CharsetUtil; import io.mycat.util.SecurityUtil; @@ -28,38 +24,21 @@ * @author wuzhihui * */ -public class BackendConCreateTask implements BackendIOTask { +public class BackendConCreateTask extends AbstractBackendIOTask { private static Logger logger = LoggerFactory.getLogger(BackendConCreateTask.class); - private ProxyBuffer prevFrontBuffer; - private ProxyBuffer prevBackendBuffer; - private NetOptMode prevNetMode; - private NIOHandler prevProxyHandler; - private final MySQLSession session; private HandshakePacket handshake; private boolean welcomePkgReceived = false; - private AsynTaskCallBack callBack; - private ErrorPacket errPkg; - - - public BackendConCreateTask(MySQLSession session,MySQLDataSource ds) { - - - prevNetMode = session.netOptMode; - session.netOptMode = UserProxySession.NetOptMode.BackendRW; - this.session = session; - // 保存之前的FrontBuffer,BackendCon收到的数据会写入到session.frontBuffer中 - this.prevFrontBuffer = session.frontBuffer; - this.prevBackendBuffer = session.backendBuffer; - session.frontBuffer = session.allocNewProxyBuffer(); - session.backendBuffer = session.allocNewProxyBuffer(); - prevProxyHandler = session.getCurNIOHandler(); + + public BackendConCreateTask(MySQLSession session, MySQLDataSource ds) { + + super(session); } @Override public void onBackendRead(MySQLSession session) throws IOException { // 不透传的状态下,需要自己控制Buffer的状态,这里每次从Socket中读取并写Buffer数据都切回初始Write状态 session.frontBuffer.reset(); - if (!session.readSocket(false) + if (!session.readFromChannel(session.frontBuffer, session.backendChannel) || !session.resolveMySQLPackage(session.frontBuffer, session.curBackendMSQLPackgInf, false)) {// 没有读到数据或者报文不完整 return; } @@ -96,10 +75,10 @@ public void onBackendRead(MySQLSession session) throws IOException { packet.database = schema; // 不透传的状态下,需要自己控制Buffer的状态,这里每次写数据都切回初始Write状态 - session.backendBuffer.reset(); - packet.write(session.backendBuffer); - session.backendBuffer.flip(); - session.writeToChannel(session.backendBuffer, session.backendChannel); + session.frontBuffer.reset(); + packet.write(session.frontBuffer); + session.frontBuffer.flip(); + session.writeToChannel(session.frontBuffer, session.backendChannel); welcomePkgReceived = true; } else { // 认证结果报文收到 @@ -115,15 +94,6 @@ public void onBackendRead(MySQLSession session) throws IOException { } } - @Override - public void onBackendWrite(MySQLSession session) throws IOException { - } - - @Override - public void onBackendSocketClosed(MySQLSession userSession, boolean normal) { - - } - @Override public void onBackendConnect(MySQLSession userSession, boolean success, String msg) throws IOException { String logInfo = success ? " backend connect success " : "backend connect failed " + msg; @@ -144,24 +114,11 @@ public void onBackendConnect(MySQLSession userSession, boolean success, String m } } - private void finished(boolean success) throws IOException { - sessionRecover(); + protected void onFinished(boolean success) { if (!success) { session.backendChannel = null; session.backendKey = null; } - callBack.finished(session, this, success, this.errPkg); - } - - public void sessionRecover() { - // 释放先前分配的资源 - session.recycleAllocedBuffer(session.frontBuffer); - session.recycleAllocedBuffer(session.backendBuffer); - // 恢复Session原来的状态 - session.frontBuffer = prevFrontBuffer; - session.backendBuffer = prevBackendBuffer; - session.netOptMode = prevNetMode; - session.setCurNIOHandler(prevProxyHandler); } private static byte[] passwd(String pass, HandshakePacket hs) throws NoSuchAlgorithmException { @@ -177,12 +134,6 @@ private static byte[] passwd(String pass, HandshakePacket hs) throws NoSuchAlgor return SecurityUtil.scramble411(passwd, seed); } - @Override - public void setCallback(AsynTaskCallBack callBack) { - this.callBack = callBack; - - } - private static long initClientFlags() { int flag = 0; flag |= Capabilities.CLIENT_LONG_PASSWORD; @@ -210,4 +161,9 @@ private static long initClientFlags() { return flag; } + @Override + public void onBackendSocketClosed(MySQLSession userSession, boolean normal) { + logger.warn("not handlered back connection close event ,session " + userSession.getSessionId()); + } + } diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendIOTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendIOTask.java index 7a4e1aa..2b260d4 100644 --- a/source/src/main/java/io/mycat/mycat2/tasks/BackendIOTask.java +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendIOTask.java @@ -1,7 +1,7 @@ package io.mycat.mycat2.tasks; +import io.mycat.mycat2.MySQLSession; import io.mycat.proxy.BackendIOHandler; -import io.mycat.proxy.UserProxySession; /** * 子任务,在某些NIOProxyHandler中会使用,比如获取后端连接,同步后端连接 @@ -9,7 +9,7 @@ * @author wuzhihui * */ -public interface BackendIOTask extends BackendIOHandler{ +public interface BackendIOTask extends BackendIOHandler{ /** * 任务完成后回调 diff --git a/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java b/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java index d49dda5..89dca7a 100644 --- a/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java +++ b/source/src/main/java/io/mycat/mycat2/tasks/BackendSynchronzationTask.java @@ -6,6 +6,8 @@ import io.mycat.mysql.packet.QueryPacket; import io.mycat.proxy.NIOHandler; import io.mycat.proxy.ProxyBuffer; +import io.mycat.proxy.UserProxySession.NetOptMode; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -16,14 +18,10 @@ *

* 同步状态至后端数据库,包括:字符集,事务,隔离级别等 */ -public class BackendSynchronzationTask implements BackendIOTask { - private NIOHandler prevProxyHandler; +public class BackendSynchronzationTask extends AbstractBackendIOTask { private static Logger logger = LoggerFactory.getLogger(BackendSynchronzationTask.class); - private AsynTaskCallBack callBack; - private MySQLSession session; private static QueryPacket[] CMDS = new QueryPacket[3]; private int processCmd = 0; - private ErrorPacket errPkg; static { QueryPacket isolationSynCmd = new QueryPacket(); @@ -41,9 +39,8 @@ public class BackendSynchronzationTask implements BackendIOTask { } public BackendSynchronzationTask(MySQLSession session) throws IOException { - prevProxyHandler = session.getCurNIOHandler(); + super(session); this.processCmd = 0; - this.session = session; syncState(session); } @@ -70,15 +67,10 @@ private void syncState(MySQLSession session) throws IOException { } - @Override - public void onBackendConnect(MySQLSession userSession, boolean success, String msg) throws IOException { - - } - @Override public void onBackendRead(MySQLSession session) throws IOException { session.frontBuffer.reset(); - if (!session.readSocket(false) + if (!session.readFromChannel(session.frontBuffer, session.backendChannel) || !session.resolveMySQLPackage(session.frontBuffer, session.curBackendMSQLPackgInf, false)) {// 没有读到数据或者报文不完整 return; } @@ -93,23 +85,9 @@ public void onBackendRead(MySQLSession session) throws IOException { } } - private void finished(boolean success) throws IOException { - session.setCurNIOHandler(prevProxyHandler); - callBack.finished(session, this, success, this.errPkg); - } - - @Override - public void onBackendWrite(MySQLSession session) throws IOException { - - } - @Override public void onBackendSocketClosed(MySQLSession userSession, boolean normal) { - + logger.warn(" socket closed not handlerd" + session.toString()); } - @Override - public void setCallback(AsynTaskCallBack callBack) { - this.callBack = callBack; - } } diff --git a/source/src/main/java/io/mycat/proxy/AbstractSession.java b/source/src/main/java/io/mycat/proxy/AbstractSession.java index 0f6c9e3..7ff44aa 100644 --- a/source/src/main/java/io/mycat/proxy/AbstractSession.java +++ b/source/src/main/java/io/mycat/proxy/AbstractSession.java @@ -98,17 +98,26 @@ public void close(String message) { * @param channel * @return 读取了多少数据 */ - public int readFromChannel(ProxyBuffer proxyBuf, SocketChannel channel) throws IOException { + public boolean readFromChannel(ProxyBuffer proxyBuf, SocketChannel channel) throws IOException { + ByteBuffer buffer = proxyBuf.getBuffer(); buffer.limit(proxyBuf.writeState.optLimit); buffer.position(proxyBuf.writeState.optPostion); int readed = channel.read(buffer); + logger.debug(" readed {} total bytes ,channel {}", readed,channel); proxyBuf.writeState.curOptedLength = readed; if (readed > 0) { proxyBuf.writeState.optPostion += readed; proxyBuf.writeState.optedTotalLength += readed; - } - return readed; + proxyBuf.readState.optLimit = proxyBuf.writeState.optPostion; + }else if (readed == -1) { + logger.warn("Read EOF ,socket closed "); + throw new ClosedChannelException(); + }else if(readed==0) + { + logger.warn("readed zero bytes ,Maybe a bug ,please fix it !!!!"); + } + return readed>0; } /** diff --git a/source/src/main/java/io/mycat/proxy/DefaultDirectProxyHandler.java b/source/src/main/java/io/mycat/proxy/DefaultDirectProxyHandler.java index a15a226..69263d3 100644 --- a/source/src/main/java/io/mycat/proxy/DefaultDirectProxyHandler.java +++ b/source/src/main/java/io/mycat/proxy/DefaultDirectProxyHandler.java @@ -14,8 +14,9 @@ * @author wuzhihui * */ -public class DefaultDirectProxyHandler implements FrontIOHandler ,BackendIOHandler{ +public class DefaultDirectProxyHandler implements FrontIOHandler, BackendIOHandler { protected static Logger logger = LoggerFactory.getLogger(DefaultDirectProxyHandler.class); + public void onBackendConnect(T userSession, boolean success, String msg) throws IOException { String logInfo = success ? " backend connect success " : "backend connect failed " + msg; logger.info(logInfo + " channel " + userSession.backendChannel); @@ -74,10 +75,8 @@ protected void onSocketException(UserProxySession userSession, Exception excepti public void onFrontRead(T userSession) throws IOException { - int readed = userSession.readFromChannel(userSession.backendBuffer, userSession.frontChannel); - if (readed == -1) { - userSession.closeSocket(userSession.frontChannel, true, "read EOF."); - } else if (readed > 0) { + boolean readed = userSession.readFromChannel(userSession.backendBuffer, userSession.frontChannel); + if (readed) { // 如果读到数据,修改NIO事件,自己不再读数据,对方则感兴趣写数据。 userSession.backendBuffer.flip(); userSession.modifySelectKey(); @@ -85,10 +84,8 @@ public void onFrontRead(T userSession) throws IOException { } public void onBackendRead(T userSession) throws IOException { - int readed = userSession.readFromChannel(userSession.frontBuffer, userSession.backendChannel); - if (readed == -1) { - userSession.closeSocket(userSession.backendChannel, true, "read EOF."); - } else if (readed > 0) { + boolean readed = userSession.readFromChannel(userSession.frontBuffer, userSession.backendChannel); + if (readed) { // 如果读到数据,修改NIO事件,自己不再读数据,对方则感兴趣写数据。 userSession.frontBuffer.flip(); userSession.modifySelectKey(); diff --git a/source/src/main/java/io/mycat/proxy/ProxyBuffer.java b/source/src/main/java/io/mycat/proxy/ProxyBuffer.java index 1531fe2..ef0e997 100644 --- a/source/src/main/java/io/mycat/proxy/ProxyBuffer.java +++ b/source/src/main/java/io/mycat/proxy/ProxyBuffer.java @@ -64,7 +64,7 @@ public void setInReading(boolean inReading) { * 交换Read与Write状态 */ public void flip() { - + if (this.inReading) { // 转为可写状态 inReading = false; @@ -88,12 +88,14 @@ public void flip() { //总字节数转换为0 readState.optedTotalLength = 0; } - logger.debug("flip, new state {} , write state: {} ,read state {}", this.inReading?"read":"write", this.writeState,this.readState); - + logger.debug("flip, new state {} , write state: {} ,read state {}", this.inReading ? "read" : "write", + this.writeState, this.readState); + } /** * 只能用在读状态下,跳过指定的N个字符 + * * @param step */ public void skip(int step) { @@ -108,14 +110,6 @@ public BufferOptState getWriteOptState() { return this.writeState; } - /** - * 更新读状态,当Buffer中写入了一些数据后,延伸读的Limit值 - */ - public BufferOptState updateReadLimit() { - this.readState.optLimit = this.writeState.optPostion; - return readState; - } - /** * 写状态时候,如果数据写满了,可以调用此方法移除之前的旧数据 */ @@ -448,15 +442,15 @@ public ProxyBuffer putLenencBytes(int index, byte[] bytes) { * Reset to write状态,清除数据 */ public void reset() { - inReading=false; - writeState.optPostion=0; - writeState.optLimit=buffer.capacity(); - writeState.curOptedLength=0; - writeState.optedTotalLength=0; - readState.optPostion=0; - readState.optLimit=0; - readState.curOptedLength=0; - readState.optedTotalLength=0; + inReading = false; + writeState.optPostion = 0; + writeState.optLimit = buffer.capacity(); + writeState.curOptedLength = 0; + writeState.optedTotalLength = 0; + readState.optPostion = 0; + readState.optLimit = 0; + readState.curOptedLength = 0; + readState.optedTotalLength = 0; } } \ No newline at end of file diff --git a/source/src/main/java/io/mycat/proxy/UserProxySession.java b/source/src/main/java/io/mycat/proxy/UserProxySession.java index 71b874e..5ec8c83 100644 --- a/source/src/main/java/io/mycat/proxy/UserProxySession.java +++ b/source/src/main/java/io/mycat/proxy/UserProxySession.java @@ -9,7 +9,9 @@ /** * 代表用户的会话,存放用户会话数据,如前端连接,后端连接,状态等数据 * Proxy模式模式下,通常一端的Socket会把读到的数据写入到对端的Buffer里,即前端收到的数据放入到BackendBuffer,随后被Backchannel写入并发送出去; - * 类似的,后端收到的数据会放入到frontBuffer里,随后被frontChannel写入并发送出去。 + * 类似的,后端收到的数据会放入到frontBuffer里,随后被frontChannel写入并发送出去。 【特别注意】 + * 如果只读写前端,那么读取到的数据应该放到前端buffer,写入的数据也放到前端Buffer 【特别注意】 + * 如果只读写后端,那么读取到的数据应该放到后端buffer,读取的数据也放到后端Buffer 否则 modifySelectKey()逻辑就错误了 * * @author wuzhihui * @@ -102,44 +104,42 @@ public void closeSocket(SocketChannel channel, boolean normal, String msg) { */ public void modifySelectKey() throws ClosedChannelException { - boolean frontKeyNeedUpdate = false; - boolean backKeyNeedUpdate = false; switch (netOptMode) { - //检查是否为透传模式 case DirectTrans: { - frontKeyNeedUpdate = true; - backKeyNeedUpdate = true; + if (frontKey != null && frontKey.isValid()) { + int clientOps = 0; + if (backendBuffer.isInWriting()) + clientOps |= SelectionKey.OP_READ; + if (frontBuffer.isInWriting() == false) + clientOps |= SelectionKey.OP_WRITE; + frontKey.interestOps(clientOps); + } + if (backendKey != null && backendKey.isValid()) { + int serverOps = 0; + if (frontBuffer.isInWriting()) + serverOps |= SelectionKey.OP_READ; + if (backendBuffer.isInWriting() == false) + serverOps |= SelectionKey.OP_WRITE; + backendKey.interestOps(serverOps); + } break; } - //只处理前端读写 case FrontRW: { - frontKeyNeedUpdate = true; - backKeyNeedUpdate = false; - break; - } - //只处理后端读写 - case BackendRW: { - frontKeyNeedUpdate = false; - backKeyNeedUpdate = true; - break; - } - } - if (frontKeyNeedUpdate && frontKey != null && frontKey.isValid()) { - int clientOps = 0; - if (backendBuffer.isInWriting()) - clientOps |= SelectionKey.OP_READ; + int clientOps = SelectionKey.OP_READ; if (frontBuffer.isInWriting() == false) - clientOps |= SelectionKey.OP_WRITE; + clientOps = SelectionKey.OP_WRITE; frontKey.interestOps(clientOps); + break; } - if (backKeyNeedUpdate && backendKey != null && backendKey.isValid()) { - int serverOps = 0; - if (frontBuffer.isInWriting()) - serverOps |= SelectionKey.OP_READ; + case BackendRW: { + int serverOps = SelectionKey.OP_READ; if (backendBuffer.isInWriting() == false) - serverOps |= SelectionKey.OP_WRITE; + serverOps = SelectionKey.OP_WRITE; backendKey.interestOps(serverOps); + break; + } } + } } diff --git a/source/src/main/java/io/mycat/proxy/man/AdminSession.java b/source/src/main/java/io/mycat/proxy/man/AdminSession.java index 03916f9..2b64f48 100644 --- a/source/src/main/java/io/mycat/proxy/man/AdminSession.java +++ b/source/src/main/java/io/mycat/proxy/man/AdminSession.java @@ -88,18 +88,7 @@ public byte receivedPacket() throws IOException { * @throws IOException */ public boolean readSocket() throws IOException { - int readed = readFromChannel(this.frontBuffer, this.frontChannel); - logger.debug("readed {} total bytes ", readed); - if (readed == -1) { - closeSocket(frontChannel, true, "read EOF."); - return false; - } else if (readed == 0) { - logger.warn("read 0 bytes ,try compact buffer ,session Id :" + this.getSessionId()); - frontBuffer.compact(true); - return false; - } - frontBuffer.updateReadLimit(); - return true; + return readFromChannel(this.frontBuffer, this.frontChannel); } public void closeSocket(SocketChannel channel, boolean normal, String msg) { @@ -112,7 +101,7 @@ public void closeSocket(SocketChannel channel, boolean normal, String msg) { channel.close(); } catch (IOException e) { } - ((FrontIOHandler)this.getCurNIOHandler()).onFrontSocketClosed(this, normal); + ((FrontIOHandler) this.getCurNIOHandler()).onFrontSocketClosed(this, normal); } diff --git a/source/src/main/java/io/mycat/proxy/man/MyCluster.java b/source/src/main/java/io/mycat/proxy/man/MyCluster.java index c91bca2..a920d05 100644 --- a/source/src/main/java/io/mycat/proxy/man/MyCluster.java +++ b/source/src/main/java/io/mycat/proxy/man/MyCluster.java @@ -85,8 +85,8 @@ public void initCluster() { InetSocketAddress serverAddress = new InetSocketAddress(curNode.ip, curNode.port); SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); - socketChannel.connect(serverAddress); socketChannel.register(nioSelector, SelectionKey.OP_CONNECT, curNode.id); + socketChannel.connect(serverAddress); } catch (Exception e) { logger.warn("connect err " + e); }