Skip to content

Commit

Permalink
optimize gtid
Browse files Browse the repository at this point in the history
  • Loading branch information
agapple committed Sep 7, 2023
1 parent 34e9440 commit d4a0970
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ public void putGtid(GTIDSet gtidSet, LogEvent gtidEvent) {
if (gtidSet != null) {
gtidMap.put(GTID_SET_STRING, gtidSet.toString());
if (gtidEvent != null && gtidEvent instanceof GtidLogEvent) {
GtidLogEvent event = (GtidLogEvent)gtidEvent;
GtidLogEvent event = (GtidLogEvent) gtidEvent;
gtidMap.put(CURRENT_GTID_STRING, event.getGtidStr());
gtidMap.put(CURRENT_GTID_SN, String.valueOf(event.getSequenceNumber()));
gtidMap.put(CURRENT_GTID_LAST_COMMIT, String.valueOf(event.getLastCommitted()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,21 @@
package com.alibaba.otter.canal.parse.driver.mysql.packets;

import org.apache.commons.lang.StringUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;

/**
* 类 MariaGTIDSet.java 的实现
*
* @author winger 2020/9/24 10:31 上午
* @version 1.0.0
*/
public class MariaGTIDSet implements GTIDSet {

//MariaDB 10.0.2+ representation of Gtid
Map<Long, MariaGtid> gtidMap = new HashMap<>();
private Map<Long, MariaGtid> gtidMap = new HashMap<>();

@Override
public byte[] encode() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ public class GtidUtil {
public static GTIDSet parseGtidSet(String gtid, boolean isMariaDB) {
if (isMariaDB) {
return MariaGTIDSet.parse(gtid);
} else {
return MysqlGTIDSet.parse(gtid);
}
return MysqlGTIDSet.parse(gtid);
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,12 @@
package com.alibaba.otter.canal.parse.inbound;

import static com.alibaba.otter.canal.parse.driver.mysql.utils.GtidUtil.parseGtidSet;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang.math.RandomUtils;
Expand All @@ -25,7 +21,7 @@
import com.alibaba.otter.canal.parse.driver.mysql.packets.GTIDSet;
import com.alibaba.otter.canal.parse.exception.CanalParseException;
import com.alibaba.otter.canal.parse.exception.PositionNotFoundException;
import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection;
import com.alibaba.otter.canal.parse.inbound.mysql.MysqlMultiStageCoprocessor;
import com.alibaba.otter.canal.parse.index.CanalLogPositionManager;
import com.alibaba.otter.canal.parse.support.AuthenticationInfo;
Expand All @@ -38,8 +34,7 @@
import com.alibaba.otter.canal.protocol.position.LogPosition;
import com.alibaba.otter.canal.sink.CanalEventSink;
import com.alibaba.otter.canal.sink.exception.CanalSinkException;

import static com.alibaba.otter.canal.parse.driver.mysql.utils.GtidUtil.parseGtidSet;
import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException;

/**
* 抽象的EventParser, 最大化共用mysql/oracle版本的实现
Expand Down Expand Up @@ -183,7 +178,7 @@ public void run() {
}

if (erosaConnection instanceof MysqlConnection) {
isMariaDB = ((MysqlConnection)erosaConnection).isMariaDB();
isMariaDB = ((MysqlConnection) erosaConnection).isMariaDB();
}
// 4. 获取最后的位置信息
long start = System.currentTimeMillis();
Expand Down Expand Up @@ -249,7 +244,7 @@ public boolean sink(EVENT event) {
multiStageCoprocessor = buildMultiStageCoprocessor();
if (isGTIDMode() && StringUtils.isNotEmpty(startPosition.getGtid())) {
// 判断所属instance是否启用GTID模式,是的话调用ErosaConnection中GTID对应方法dump数据
GTIDSet gtidSet = parseGtidSet(startPosition.getGtid(),isMariaDB);
GTIDSet gtidSet = parseGtidSet(startPosition.getGtid(), isMariaDB);
((MysqlMultiStageCoprocessor) multiStageCoprocessor).setGtidSet(gtidSet);
multiStageCoprocessor.start();
erosaConnection.dump(gtidSet, multiStageCoprocessor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,9 @@ public void sendSemiAck(String binlogfilename, Long binlogPosition) throws IOExc
private void sendBinlogDumpGTID(GTIDSet gtidSet) throws IOException {
if (isMariaDB()) {
sendMariaBinlogDumpGTID(gtidSet);
return;
} else {
sendMySQLBinlogDumpGTID(gtidSet);
}
sendMySQLBinlogDumpGTID(gtidSet);
}

private void sendMySQLBinlogDumpGTID(GTIDSet gtidSet) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ private final long generateUniqueServerId() {

protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
if (isGTIDMode()) {
// GTID模式下,CanalLogPositionManager里取最后的gtid,没有则取instanc配置中的
// GTID模式下,CanalLogPositionManager里取最后的gtid,没有则取instance配置中的
LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination);
if (logPosition != null) {
// 如果以前是非GTID模式,后来调整为了GTID模式,那么为了保持兼容,需要判断gtid是否为空
Expand Down Expand Up @@ -381,7 +381,7 @@ protected EntryPosition findEndPosition(ErosaConnection connection) throws IOExc
protected EntryPosition findEndPositionWithMasterIdAndTimestamp(MysqlConnection connection) {
MysqlConnection mysqlConnection = (MysqlConnection) connection;
final EntryPosition endPosition = findEndPosition(mysqlConnection);
if (tableMetaTSDB != null) {
if (tableMetaTSDB != null || isGTIDMode()) {
long startTimestamp = System.currentTimeMillis();
return findAsPerTimestampInSpecificLogFile(mysqlConnection,
startTimestamp,
Expand Down Expand Up @@ -426,7 +426,8 @@ protected EntryPosition findStartPositionInternal(ErosaConnection connection) {
}

if (entryPosition == null) {
entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
entryPosition =
findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默认从当前最后一个位置进行消费
}

// 判断一下是否需要按时间订阅
Expand Down

0 comments on commit d4a0970

Please sign in to comment.