diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java index 63873bec2b772..3d3b1d1688065 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java @@ -18,18 +18,18 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event; import lombok.Getter; -import lombok.Setter; +import lombok.RequiredArgsConstructor; /** * MySQL base binlog event. */ +@RequiredArgsConstructor @Getter -@Setter public abstract class MySQLBaseBinlogEvent { - private String fileName; + private final String fileName; - private long position; + private final long position; - private long timestamp; + private final long timestamp; } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java index ecc57d276ca1b..db007c6eb3f8f 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java @@ -21,4 +21,8 @@ * Placeholder binlog event, unsupported binlog event will replace it into this class. */ public final class PlaceholderBinlogEvent extends MySQLBaseBinlogEvent { + + public PlaceholderBinlogEvent(final String fileName, final long position, final long timestamp) { + super(fileName, position, timestamp); + } } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java index a521588fc6b7a..8b0332a13c027 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query; import lombok.Getter; -import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent; /** @@ -29,7 +28,6 @@ * * @see QUERY_EVENT */ -@RequiredArgsConstructor @Getter public final class MySQLQueryBinlogEvent extends MySQLBaseBinlogEvent { @@ -42,4 +40,14 @@ public final class MySQLQueryBinlogEvent extends MySQLBaseBinlogEvent { private final String databaseName; private final String sql; + + public MySQLQueryBinlogEvent(final String fileName, final long position, final long timestamp, + final long threadId, final long executionTime, final int errorCode, final String databaseName, final String sql) { + super(fileName, position, timestamp); + this.threadId = threadId; + this.executionTime = executionTime; + this.errorCode = errorCode; + this.databaseName = databaseName; + this.sql = sql; + } } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java index 0c9d39784beba..dfa8fda01686a 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java @@ -18,17 +18,21 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows; import lombok.Getter; -import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent; /** * MySQL rows base event. */ -@RequiredArgsConstructor @Getter public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent { private final String databaseName; private final String tableName; + + public MySQLBaseRowsBinlogEvent(final String fileName, final long position, final long timestamp, final String databaseName, final String tableName) { + super(fileName, position, timestamp); + this.databaseName = databaseName; + this.tableName = tableName; + } } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java index 50e3d14d6b49f..e08ce8589e84a 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java @@ -30,8 +30,8 @@ public final class MySQLDeleteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent { private final List beforeRows; - public MySQLDeleteRowsBinlogEvent(final String databaseName, final String tableName, final List beforeRows) { - super(databaseName, tableName); + public MySQLDeleteRowsBinlogEvent(final String fileName, final long position, final long timestamp, final String databaseName, final String tableName, final List beforeRows) { + super(fileName, position, timestamp, databaseName, tableName); this.beforeRows = beforeRows; } } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java index 8c3379ea7860d..e3eef2f03263f 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java @@ -32,8 +32,9 @@ public final class MySQLUpdateRowsBinlogEvent extends MySQLBaseRowsBinlogEvent { private final List afterRows; - public MySQLUpdateRowsBinlogEvent(final String databaseName, final String tableName, final List beforeRows, final List afterRows) { - super(databaseName, tableName); + public MySQLUpdateRowsBinlogEvent(final String fileName, final long position, final long timestamp, + final String databaseName, final String tableName, final List beforeRows, final List afterRows) { + super(fileName, position, timestamp, databaseName, tableName); this.beforeRows = beforeRows; this.afterRows = afterRows; } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java index db1029bfac079..25e73d8e8fc60 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java @@ -30,8 +30,8 @@ public final class MySQLWriteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent { private final List afterRows; - public MySQLWriteRowsBinlogEvent(final String databaseName, final String tableName, final List afterRows) { - super(databaseName, tableName); + public MySQLWriteRowsBinlogEvent(final String fileName, final long position, final long timestamp, final String databaseName, final String tableName, final List afterRows) { + super(fileName, position, timestamp, databaseName, tableName); this.afterRows = afterRows; } } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java index b3c2e2af8856c..6601f3c2ec8cc 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java @@ -18,7 +18,6 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction; import lombok.Getter; -import lombok.RequiredArgsConstructor; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent; /** @@ -26,9 +25,13 @@ * * @see XID_EVENT */ -@RequiredArgsConstructor @Getter public final class MySQLXidBinlogEvent extends MySQLBaseBinlogEvent { private final long xid; + + public MySQLXidBinlogEvent(final String fileName, final long position, final long timestamp, final long xid) { + super(fileName, position, timestamp); + this.xid = xid; + } } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java index 7ba01844b640a..4436f14e6e281 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java @@ -221,18 +221,11 @@ private void dumpBinlog(final String binlogFileName, final long binlogPosition, channel.pipeline().remove(MySQLCommandResponseHandler.class); String tableKey = String.join(":", connectInfo.getHost(), String.valueOf(connectInfo.getPort())); channel.pipeline().addLast(new MySQLBinlogEventPacketDecoder(checksumLength, GlobalTableMapEventMapping.getTableMapEventMap(tableKey), decodeWithTX)); - channel.pipeline().addLast(new MySQLBinlogEventHandler(getLastBinlogEvent(binlogFileName, binlogPosition))); + channel.pipeline().addLast(new MySQLBinlogEventHandler(new PlaceholderBinlogEvent(binlogFileName, binlogPosition, 0L))); resetSequenceID(); channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, connectInfo.getServerId(), binlogFileName)); } - private MySQLBaseBinlogEvent getLastBinlogEvent(final String binlogFileName, final long binlogPosition) { - PlaceholderBinlogEvent result = new PlaceholderBinlogEvent(); - result.setFileName(binlogFileName); - result.setPosition(binlogPosition); - return result; - } - private void resetSequenceID() { channel.attr(MySQLConstants.MYSQL_SEQUENCE_ID).get().set(0); } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java index 25f7919687c30..edc6021a28c67 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java @@ -25,10 +25,9 @@ import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query.MySQLQueryBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction.MySQLXidBinlogEvent; @@ -201,37 +200,28 @@ private MySQLWriteRowsBinlogEvent decodeWriteRowsEventV2(final MySQLBinlogEventH MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload); MySQLBinlogTableMapEventPacket tableMapEventPacket = binlogContext.getTableMapEvent(packet.getTableId()); packet.readRows(tableMapEventPacket, payload); - MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows()); - initRowsEvent(result, binlogEventHeader); - return result; + return new MySQLWriteRowsBinlogEvent(binlogContext.getFileName(), + binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows()); } private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload); MySQLBinlogTableMapEventPacket tableMapEventPacket = binlogContext.getTableMapEvent(packet.getTableId()); packet.readRows(tableMapEventPacket, payload); - MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows(), packet.getRows2()); - initRowsEvent(result, binlogEventHeader); - return result; + return new MySQLUpdateRowsBinlogEvent(binlogContext.getFileName(), + binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows(), packet.getRows2()); } private MySQLDeleteRowsBinlogEvent decodeDeleteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload); MySQLBinlogTableMapEventPacket tableMapEventPacket = binlogContext.getTableMapEvent(packet.getTableId()); packet.readRows(tableMapEventPacket, payload); - MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows()); - initRowsEvent(result, binlogEventHeader); - return result; - } - - private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final MySQLBinlogEventHeader binlogEventHeader) { - rowsEvent.setFileName(binlogContext.getFileName()); - rowsEvent.setPosition(binlogEventHeader.getLogPos()); - rowsEvent.setTimestamp(binlogEventHeader.getTimestamp()); + return new MySQLDeleteRowsBinlogEvent(binlogContext.getFileName(), + binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), tableMapEventPacket.getSchemaName(), tableMapEventPacket.getTableName(), packet.getRows()); } private PlaceholderBinlogEvent decodePlaceholderEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { - PlaceholderBinlogEvent result = createPlaceholderEvent(binlogEventHeader); + PlaceholderBinlogEvent result = new PlaceholderBinlogEvent(binlogContext.getFileName(), binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp()); int remainDataLength = binlogEventHeader.getEventSize() + 1 - binlogEventHeader.getChecksumLength() - payload.getByteBuf().readerIndex(); if (remainDataLength > 0) { payload.skipReserved(remainDataLength); @@ -247,28 +237,11 @@ private MySQLQueryBinlogEvent decodeQueryEvent(final MySQLBinlogEventHeader binl payload.skipReserved(payload.readInt2()); String databaseName = payload.readStringNul(); String sql = payload.readStringFix(payload.getByteBuf().readableBytes() - binlogEventHeader.getChecksumLength()); - MySQLQueryBinlogEvent result = new MySQLQueryBinlogEvent(threadId, executionTime, errorCode, databaseName, sql); - result.setFileName(binlogContext.getFileName()); - result.setPosition(binlogEventHeader.getLogPos()); - result.setTimestamp(binlogEventHeader.getTimestamp()); - return result; + return new MySQLQueryBinlogEvent(binlogContext.getFileName(), binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), threadId, executionTime, errorCode, databaseName, sql); } private MySQLXidBinlogEvent decodeXidEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { - MySQLXidBinlogEvent result = new MySQLXidBinlogEvent(payload.readInt8()); - result.setFileName(binlogContext.getFileName()); - result.setPosition(binlogEventHeader.getLogPos()); - result.setTimestamp(binlogEventHeader.getTimestamp()); - return result; - } - - // TODO May be used again later, keep this method first. - private PlaceholderBinlogEvent createPlaceholderEvent(final MySQLBinlogEventHeader binlogEventHeader) { - PlaceholderBinlogEvent result = new PlaceholderBinlogEvent(); - result.setFileName(binlogContext.getFileName()); - result.setPosition(binlogEventHeader.getLogPos()); - result.setTimestamp(binlogEventHeader.getTimestamp()); - return result; + return new MySQLXidBinlogEvent(binlogContext.getFileName(), binlogEventHeader.getLogPos(), binlogEventHeader.getTimestamp(), payload.readInt8()); } private void skipChecksum(final int eventType, final ByteBuf in) { diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java index e7926fe549934..151add22cf79a 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java @@ -134,7 +134,7 @@ private List mockOrderColumnsMetaDataList() { @Test void assertWriteRowsEvent() throws ReflectiveOperationException { - List actual = getRecordsByWriteRowsEvent(new MySQLWriteRowsBinlogEvent("", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"}))); + List actual = getRecordsByWriteRowsEvent(new MySQLWriteRowsBinlogEvent("", 0, 0L, "", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"}))); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); assertThat(((DataRecord) actual.get(0)).getType(), is(PipelineSQLOperationType.INSERT)); @@ -148,8 +148,8 @@ private List getRecordsByWriteRowsEvent(final MySQLWriteRowsBinlogEvent @Test void assertUpdateRowsEvent() throws ReflectiveOperationException { - List actual = getRecordsByUpdateRowsEvent( - new MySQLUpdateRowsBinlogEvent("test", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"}))); + List actual = getRecordsByUpdateRowsEvent(new MySQLUpdateRowsBinlogEvent( + "", 0, 0L, "test", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"}))); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); assertThat(((DataRecord) actual.get(0)).getType(), is(PipelineSQLOperationType.UPDATE)); @@ -163,7 +163,7 @@ private List getRecordsByUpdateRowsEvent(final MySQLUpdateRowsBinlogEven @Test void assertDeleteRowsEvent() throws ReflectiveOperationException { - List actual = getRecordsByDeleteRowsEvent(new MySQLDeleteRowsBinlogEvent("", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"}))); + List actual = getRecordsByDeleteRowsEvent(new MySQLDeleteRowsBinlogEvent("", 0, 0L, "", "t_order", Collections.singletonList(new Serializable[]{101, 1, "OK"}))); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); assertThat(((DataRecord) actual.get(0)).getType(), is(PipelineSQLOperationType.DELETE)); @@ -178,14 +178,14 @@ private List getRecordsByDeleteRowsEvent(final MySQLDeleteRowsBinlogEven @Test void assertPlaceholderEvent() throws ReflectiveOperationException { List actual = (List) Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent", MySQLBaseBinlogEvent.class), - incrementalDumper, new PlaceholderBinlogEvent()); + incrementalDumper, new PlaceholderBinlogEvent("", 0, 0L)); assertThat(actual.size(), is(1)); } @Test void assertRowsEventFiltered() throws ReflectiveOperationException { List actual = (List) Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent", MySQLBaseBinlogEvent.class), - incrementalDumper, new MySQLWriteRowsBinlogEvent("test", "t_order", Collections.singletonList(new Serializable[]{1}))); + incrementalDumper, new MySQLWriteRowsBinlogEvent("", 0, 0L, "test", "t_order", Collections.singletonList(new Serializable[]{1}))); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); }