Skip to content

Commit

Permalink
[Improve][CDC] support exactly-once of cdc, fix the BinlogOffset comp…
Browse files Browse the repository at this point in the history
…aring bug
  • Loading branch information
happyboy1024 committed Jul 10, 2023
1 parent d3462ec commit 5482b52
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,11 @@ public void close() {

private boolean isChangeRecordInChunkRange(SourceRecord record) {
if (taskContext.isDataChangeRecord(record)) {
// fix the between condition
return taskContext.isRecordBetween(
record,
null == currentSnapshotSplit.getSplitStart()
? null
: new Object[] {currentSnapshotSplit.getSplitStart()},
null == currentSnapshotSplit.getSplitEnd()
? null
: new Object[] {currentSnapshotSplit.getSplitEnd()});
currentSnapshotSplit.getSplitStart(),
currentSnapshotSplit.getSplitEnd());
}
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.seatunnel.common.utils.SeaTunnelException;
import org.apache.seatunnel.connectors.cdc.base.source.offset.Offset;
import org.apache.seatunnel.connectors.cdc.base.source.split.CompletedSnapshotSplitInfo;
import org.apache.seatunnel.connectors.cdc.base.source.split.IncrementalSplit;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceRecords;
import org.apache.seatunnel.connectors.cdc.base.source.split.SourceSplitBase;
Expand All @@ -32,8 +33,12 @@
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
Expand All @@ -49,6 +54,8 @@
public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, SourceSplitBase> {
private final FetchTask.Context taskContext;
private final ExecutorService executorService;
// has entered pure binlog mode
private final Set<TableId> pureBinlogPhaseTables;
private volatile ChangeEventQueue<DataChangeEvent> queue;
private volatile Throwable readException;

Expand All @@ -58,13 +65,19 @@ public class IncrementalSourceStreamFetcher implements Fetcher<SourceRecords, So

private Offset splitStartWatermark;

// maximum watermark for each table
private Map<TableId, Offset> maxSplitHighWatermarkMap;
// finished spilt info
private Map<TableId, List<CompletedSnapshotSplitInfo>> finishedSplitsInfo;

private static final long READER_CLOSE_TIMEOUT_SECONDS = 30L;

public IncrementalSourceStreamFetcher(FetchTask.Context taskContext, int subTaskId) {
this.taskContext = taskContext;
ThreadFactory threadFactory =
new ThreadFactoryBuilder().setNameFormat("debezium-reader-" + subTaskId).build();
this.executorService = Executors.newSingleThreadExecutor(threadFactory);
this.pureBinlogPhaseTables = new HashSet<>();
}

@Override
Expand Down Expand Up @@ -157,14 +170,72 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
tableId);
return position.isAfter(splitStartWatermark);
}
// TODO only the table who captured snapshot splits need to filter( Used to support
// Exactly-Once )
return position.isAfter(splitStartWatermark);
// check whether the pure binlog mode has been entered
if (hasEnterPureBinlogPhase(tableId, position)) {
return true;
}
// not enter pure binlog mode and need to check whether the current record meets the emitting
// conditions.
if (finishedSplitsInfo.containsKey(tableId)) {
for (CompletedSnapshotSplitInfo splitInfo : finishedSplitsInfo.get(tableId)) {
if (taskContext.isRecordBetween(
sourceRecord,
splitInfo.getSplitStart(),
splitInfo.getSplitEnd())
&& position.isAfter(splitInfo.getWatermark().getHighWatermark())) {
return true;
}
}
}
return false;
}
return true;
}

private boolean hasEnterPureBinlogPhase(TableId tableId, Offset position) {
// only the table who captured snapshot splits need to filter
if (pureBinlogPhaseTables.contains(tableId)) {
return true;
}
// the existed tables those have finished snapshot reading
if (maxSplitHighWatermarkMap.containsKey(tableId)
&& position.isAtOrAfter(maxSplitHighWatermarkMap.get(tableId))) {
pureBinlogPhaseTables.add(tableId);
return true;
}
return false;
}

private void configureFilter() {
splitStartWatermark = currentIncrementalSplit.getStartupOffset();
Map<TableId, List<CompletedSnapshotSplitInfo>> splitsInfoMap = new HashMap<>();
Map<TableId, Offset> tableIdBinlogPositionMap = new HashMap<>();
List<CompletedSnapshotSplitInfo> completedSnapshotSplitInfos =
currentIncrementalSplit.getCompletedSnapshotSplitInfos();

// latest-offset mode
if (completedSnapshotSplitInfos.isEmpty()) {
for (TableId tableId : currentIncrementalSplit.getTableIds()) {
tableIdBinlogPositionMap.put(tableId, currentIncrementalSplit.getStartupOffset());
}
}

// calculate the max high watermark of every table
for (CompletedSnapshotSplitInfo finishedSplitInfo : completedSnapshotSplitInfos) {
TableId tableId = finishedSplitInfo.getTableId();
List<CompletedSnapshotSplitInfo> list =
splitsInfoMap.getOrDefault(tableId, new ArrayList<>());
list.add(finishedSplitInfo);
splitsInfoMap.put(tableId, list);

Offset highWatermark = finishedSplitInfo.getWatermark().getHighWatermark();
Offset maxHighWatermark = tableIdBinlogPositionMap.get(tableId);
if (maxHighWatermark == null || highWatermark.isAfter(maxHighWatermark)) {
tableIdBinlogPositionMap.put(tableId, highWatermark);
}
}
this.finishedSplitsInfo = splitsInfoMap;
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;
this.pureBinlogPhaseTables.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,13 @@ public int compareTo(Offset offset) {
// compared ...
long timestamp = this.getTimestamp();
long targetTimestamp = that.getTimestamp();
return Long.compare(timestamp, targetTimestamp);
// Timestamps are presupposes that they exist,
// because timestamps do not exist for low watermark and high watermark.
// If not judging here results in the really binlog offset comparison to watermark
// always being true.
if (timestamp != 0 && targetTimestamp != 0) {
return Long.compare(timestamp, targetTimestamp);
}
}

// First compare the MySQL binlog filenames
Expand Down

0 comments on commit 5482b52

Please sign in to comment.