Skip to content

Commit

Permalink
状态同步改成一条命令
Browse files Browse the repository at this point in the history
  • Loading branch information
ynfeng committed Aug 19, 2017
1 parent 12b9147 commit 0c6eb1e
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 75 deletions.
14 changes: 10 additions & 4 deletions source/src/main/java/io/mycat/mycat2/MySQLSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.nio.channels.SocketChannel;

import io.mycat.mycat2.beans.*;
import io.mycat.mysql.AutoCommit;
import io.mycat.mysql.Capabilities;
import io.mycat.mysql.Isolation;
import io.mycat.mysql.packet.HandshakePacket;
Expand Down Expand Up @@ -50,6 +51,11 @@ public class MySQLSession extends UserProxySession {
*/
public Isolation isolation = Isolation.REPEATED_READ;

/**
* 事务提交方式
*/
public AutoCommit autoCommit = AutoCommit.OFF;

/**
* 认证中的seed报文数据
*/
Expand Down Expand Up @@ -82,7 +88,7 @@ protected int getServerCapabilities() {

/**
* 回应客户端(front或Sever)OK 报文。
*
*
* @param pkg
* ,必须要是OK报文或者Err报文
* @throws IOException
Expand All @@ -103,7 +109,7 @@ public void responseOKOrError(MySQLPacket pkg, boolean front) throws IOException

/**
* 给客户端(front)发送认证报文
*
*
* @throws IOException
*/
public void sendAuthPackge() throws IOException {
Expand Down Expand Up @@ -141,7 +147,7 @@ public MySQLSession(BufferPool bufPool, Selector nioSelector, SocketChannel fron

/**
* 向前端发送数据报文,需要先确定为Write状态并确保写入位置的正确(frontBuffer.writeState)
*
*
* @param rawPkg
* @throws IOException
*/
Expand All @@ -154,7 +160,7 @@ public void answerFront(byte[] rawPkg) throws IOException {
/**
* 解析MySQL报文,解析的结果存储在curMSQLPackgInf中,如果解析到完整的报文,就返回TRUE
* 如果解析的过程中同时要移动ProxyBuffer的readState位置,即标记为读过,后继调用开始解析下一个报文,则需要参数markReaded=true
*
*
* @param proxyBuf
* @return
* @throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ public void onBackendRead(MySQLSession session) throws IOException {

ProxyBuffer backendBuffer = session.frontBuffer;

if (session.resolveMySQLPackage(backendBuffer, session.curFrontMSQLPackgInf, false) == false||!session.curFrontMSQLPackgInf.crossBuffer) {
if (session.resolveMySQLPackage(backendBuffer, session.curFrontMSQLPackgInf, false) == false && !session.curFrontMSQLPackgInf.crossBuffer) {
// 没有读到完整报文, 也不是挎包
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,81 +11,55 @@
import io.mycat.mysql.packet.QueryPacket;
import io.mycat.proxy.ProxyBuffer;

import javax.management.Query;

/**
* Created by ynfeng on 2017/8/13.
* <p>
* 同步状态至后端数据库,包括:字符集,事务,隔离级别等
*/
public class BackendSynchronzationTask extends AbstractBackendIOTask {
private static Logger logger = LoggerFactory.getLogger(BackendSynchronzationTask.class);
private static QueryPacket[] CMDS = new QueryPacket[3];
private int processCmd = 0;

static {
QueryPacket isolationSynCmd = new QueryPacket();
isolationSynCmd.packetId = 0;

QueryPacket charsetSynCmd = new QueryPacket();
charsetSynCmd.packetId = 0;

QueryPacket transactionSynCmd = new QueryPacket();
transactionSynCmd.packetId = 0;

CMDS[0] = isolationSynCmd;
CMDS[1] = charsetSynCmd;
CMDS[2] = transactionSynCmd;
}

public BackendSynchronzationTask(MySQLSession session) throws IOException {
super(session);
this.processCmd = 0;
syncState(session);
}

private void syncState(MySQLSession session) throws IOException {
logger.info("synchronzation state to bakcend.session=" + session.toString());
ProxyBuffer frontBuffer = session.frontBuffer;
frontBuffer.reset();
// TODO 字符集映射和前端事务设置还未完成,这里只用隔离级别模拟实现(其实都是SET xxx效果一样),回头补充
switch (processCmd) {
case 1:
case 2:
case 0:
CMDS[processCmd].sql = session.isolation.getCmd();
CMDS[processCmd].write(frontBuffer);
private static Logger logger = LoggerFactory.getLogger(BackendSynchronzationTask.class);

frontBuffer.flip();
session.writeToChannel(frontBuffer, session.backendChannel);
processCmd++;
break;
default:
this.finished(true);
break;
}
public BackendSynchronzationTask(MySQLSession session) throws IOException {
super(session);
syncState(session);
}

}
private void syncState(MySQLSession session) throws IOException {
logger.info("synchronzation state to bakcend.session=" + session.toString());
ProxyBuffer frontBuffer = session.frontBuffer;
frontBuffer.reset();
// TODO 字符集映射未完成
QueryPacket queryPacket = new QueryPacket();
queryPacket.packetId = 0;
queryPacket.sql = session.isolation.getCmd() + session.autoCommit.getCmd() + session.isolation.getCmd();
queryPacket.write(frontBuffer);
frontBuffer.flip();
session.writeToChannel(frontBuffer, session.backendChannel);
}

@Override
public void onBackendRead(MySQLSession session) throws IOException {
session.frontBuffer.reset();
if (!session.readFromChannel(session.frontBuffer, session.backendChannel)
|| !session.resolveMySQLPackage(session.frontBuffer, session.curBackendMSQLPackgInf, false)) {// 没有读到数据或者报文不完整
return;
}
if (session.curBackendMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) {
syncState(session);
} else {
// TODO 同步失败如何处理??是否应该关闭此连接??
errPkg = new ErrorPacket();
errPkg.read(session.frontBuffer);
logger.warn("backend state sync Error.Err No. " + errPkg.errno + "," + errPkg.message);
this.finished(false);
}
}
@Override
public void onBackendRead(MySQLSession session) throws IOException {
session.frontBuffer.reset();
if (!session.readFromChannel(session.frontBuffer, session.backendChannel)
|| !session.resolveMySQLPackage(session.frontBuffer, session.curBackendMSQLPackgInf, false)) {// 没有读到数据或者报文不完整
return;
}
if (session.curBackendMSQLPackgInf.pkgType == MySQLPacket.OK_PACKET) {
session.frontBuffer.reset();
this.finished(true);
} else {
errPkg = new ErrorPacket();
errPkg.read(session.frontBuffer);
logger.warn("backend state sync Error.Err No. " + errPkg.errno + "," + errPkg.message);
this.finished(false);
}
}

@Override
public void onBackendSocketClosed(MySQLSession userSession, boolean normal) {
logger.warn(" socket closed not handlerd" + session.toString());
}
@Override
public void onBackendSocketClosed(MySQLSession userSession, boolean normal) {
logger.warn(" socket closed not handlerd" + session.toString());
}

}
19 changes: 19 additions & 0 deletions source/src/main/java/io/mycat/mysql/AutoCommit.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.mycat.mysql;

/**
* Created by ynfeng on 2017/8/19.
*/
public enum AutoCommit {
ON("SET autocommit = 1;"),
OFF("SET autocommit = 0;");

AutoCommit(String cmd) {
this.cmd = cmd;
}

private String cmd;

public String getCmd() {
return cmd;
}
}
4 changes: 0 additions & 4 deletions source/src/main/java/io/mycat/mysql/Isolation.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,5 @@ public enum Isolation {
public String getCmd() {
return cmd;
}

public void setCmd(String cmd) {
this.cmd = cmd;
}
}

0 comments on commit 0c6eb1e

Please sign in to comment.