From cdd6934db186b2befca1754ad47293b3b4f14f05 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Fri, 16 Aug 2024 22:46:26 +0800 Subject: [PATCH 1/4] Rename MySQLBaseBinlogEvent --- ...ogEvent.java => MySQLBaseBinlogEvent.java} | 4 +- ...Event.java => PlaceholderBinlogEvent.java} | 2 +- .../MySQLQueryBinlogEvent.java} | 5 +- .../MySQLBaseRowsBinlogEvent.java} | 7 +- .../MySQLDeleteRowsBinlogEvent.java} | 6 +- .../MySQLUpdateRowsBinlogEvent.java} | 6 +- .../MySQLWriteRowsBinlogEvent.java} | 6 +- .../MySQLXidBinlogEvent.java} | 5 +- .../incremental/client/MySQLBinlogClient.java | 24 +++---- .../netty/MySQLBinlogEventPacketDecoder.java | 68 +++++++++---------- .../dumper/MySQLIncrementalDumper.java | 42 ++++++------ .../MySQLBinlogEventPacketDecoderTest.java | 30 ++++---- .../dumper/MySQLIncrementalDumperTest.java | 44 ++++++------ 13 files changed, 126 insertions(+), 123 deletions(-) rename kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/{AbstractBinlogEvent.java => MySQLBaseBinlogEvent.java} (93%) rename kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/{PlaceholderEvent.java => PlaceholderBinlogEvent.java} (93%) rename kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/{QueryEvent.java => query/MySQLQueryBinlogEvent.java} (88%) rename kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/{AbstractRowsEvent.java => rows/MySQLBaseRowsBinlogEvent.java} (80%) rename kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/{DeleteRowsEvent.java => rows/MySQLDeleteRowsBinlogEvent.java} (87%) rename kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/{UpdateRowsEvent.java => rows/MySQLUpdateRowsBinlogEvent.java} (88%) rename kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/{WriteRowsEvent.java => rows/MySQLWriteRowsBinlogEvent.java} (87%) rename kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/{XidEvent.java => transaction/MySQLXidBinlogEvent.java} (85%) diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java similarity index 93% rename from kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractBinlogEvent.java rename to kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java index 350f9bb56c230..63873bec2b772 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/MySQLBaseBinlogEvent.java @@ -21,11 +21,11 @@ import lombok.Setter; /** - * Abstract binlog event. + * MySQL base binlog event. */ @Getter @Setter -public abstract class AbstractBinlogEvent { +public abstract class MySQLBaseBinlogEvent { private String fileName; diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java similarity index 93% rename from kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderEvent.java rename to kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java index f37272f462565..ecc57d276ca1b 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/PlaceholderBinlogEvent.java @@ -20,5 +20,5 @@ /** * Placeholder binlog event, unsupported binlog event will replace it into this class. */ -public final class PlaceholderEvent extends AbstractBinlogEvent { +public final class PlaceholderBinlogEvent extends MySQLBaseBinlogEvent { } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/QueryEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java similarity index 88% rename from kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/QueryEvent.java rename to kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java index 5b3e591dd733f..a521588fc6b7a 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/QueryEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/query/MySQLQueryBinlogEvent.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event; +package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query; import lombok.Getter; import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent; /** * Query event.This event is written into the binary log file for: @@ -30,7 +31,7 @@ */ @RequiredArgsConstructor @Getter -public final class QueryEvent extends AbstractBinlogEvent { +public final class MySQLQueryBinlogEvent extends MySQLBaseBinlogEvent { private final long threadId; diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractRowsEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java similarity index 80% rename from kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractRowsEvent.java rename to kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java index 56ecaeeb96370..8bb91ffd8332d 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/AbstractRowsEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLBaseRowsBinlogEvent.java @@ -15,17 +15,18 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event; +package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows; import lombok.Getter; import lombok.Setter; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent; /** - * Abstract rows event. + * MySQL rows base event. */ @Getter @Setter -public abstract class AbstractRowsEvent extends AbstractBinlogEvent { +public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent { private String databaseName; diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/DeleteRowsEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java similarity index 87% rename from kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/DeleteRowsEvent.java rename to kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java index 1d27a8dbd7a42..a2221e988f1f7 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/DeleteRowsEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event; +package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows; import lombok.Getter; import lombok.Setter; @@ -24,11 +24,11 @@ import java.util.List; /** - * Delete rows event. + * MySQL delete rows binlog event. */ @Getter @Setter -public final class DeleteRowsEvent extends AbstractRowsEvent { +public final class MySQLDeleteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent { private List beforeRows; } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/UpdateRowsEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java similarity index 88% rename from kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/UpdateRowsEvent.java rename to kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java index 0f89a70f39629..0ecdebd3ab092 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/UpdateRowsEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event; +package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows; import lombok.Getter; import lombok.Setter; @@ -24,11 +24,11 @@ import java.util.List; /** - * Update rows event. + * MySQL update rows binlog event. */ @Getter @Setter -public final class UpdateRowsEvent extends AbstractRowsEvent { +public final class MySQLUpdateRowsBinlogEvent extends MySQLBaseRowsBinlogEvent { private List beforeRows; diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/WriteRowsEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java similarity index 87% rename from kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/WriteRowsEvent.java rename to kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java index 35f76361006c8..7c52bb4066be8 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/WriteRowsEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event; +package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows; import lombok.Getter; import lombok.Setter; @@ -24,11 +24,11 @@ import java.util.List; /** - * Write rows event. + * MySQL write rows binlog event. */ @Getter @Setter -public final class WriteRowsEvent extends AbstractRowsEvent { +public final class MySQLWriteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent { private List afterRows; } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/XidEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java similarity index 85% rename from kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/XidEvent.java rename to kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java index 9b7589513e1cc..b3c2e2af8856c 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/XidEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/transaction/MySQLXidBinlogEvent.java @@ -15,10 +15,11 @@ * limitations under the License. */ -package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event; +package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction; import lombok.Getter; import lombok.RequiredArgsConstructor; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent; /** * XID event is generated for a COMMIT of a transaction that modifies one or more tables of an XA-capable storage engine. @@ -27,7 +28,7 @@ */ @RequiredArgsConstructor @Getter -public final class XidEvent extends AbstractBinlogEvent { +public final class MySQLXidBinlogEvent extends MySQLBaseBinlogEvent { private final long xid; } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java index 3ec1a166257da..7ba01844b640a 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/MySQLBinlogClient.java @@ -34,8 +34,8 @@ import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLBinlogEventPacketDecoder; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLCommandPacketDecoder; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLNegotiateHandler; @@ -77,7 +77,7 @@ public final class MySQLBinlogClient { private final boolean decodeWithTX; - private final ArrayBlockingQueue> blockingEventQueue = new ArrayBlockingQueue<>(2500); + private final ArrayBlockingQueue> blockingEventQueue = new ArrayBlockingQueue<>(2500); private EventLoopGroup eventLoopGroup; @@ -226,8 +226,8 @@ private void dumpBinlog(final String binlogFileName, final long binlogPosition, channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, connectInfo.getServerId(), binlogFileName)); } - private AbstractBinlogEvent getLastBinlogEvent(final String binlogFileName, final long binlogPosition) { - PlaceholderEvent result = new PlaceholderEvent(); + private MySQLBaseBinlogEvent getLastBinlogEvent(final String binlogFileName, final long binlogPosition) { + PlaceholderBinlogEvent result = new PlaceholderBinlogEvent(); result.setFileName(binlogFileName); result.setPosition(binlogPosition); return result; @@ -242,12 +242,12 @@ private void resetSequenceID() { * * @return binlog event */ - public synchronized List poll() { + public synchronized List poll() { if (!running) { return Collections.emptyList(); } try { - List result = blockingEventQueue.poll(100L, TimeUnit.MILLISECONDS); + List result = blockingEventQueue.poll(100L, TimeUnit.MILLISECONDS); return null == result ? Collections.emptyList() : result; } catch (final InterruptedException ignored) { Thread.currentThread().interrupt(); @@ -314,11 +314,11 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau private final class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter { - private final AtomicReference lastBinlogEvent; + private final AtomicReference lastBinlogEvent; private final AtomicBoolean reconnectRequested = new AtomicBoolean(false); - MySQLBinlogEventHandler(final AbstractBinlogEvent lastBinlogEvent) { + MySQLBinlogEventHandler(final MySQLBaseBinlogEvent lastBinlogEvent) { this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent); } @@ -329,7 +329,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw return; } if (msg instanceof List) { - List records = (List) msg; + List records = (List) msg; if (records.isEmpty()) { log.warn("The records is empty"); return; @@ -338,8 +338,8 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw blockingEventQueue.put(records); return; } - if (msg instanceof AbstractBinlogEvent) { - lastBinlogEvent.set((AbstractBinlogEvent) msg); + if (msg instanceof MySQLBaseBinlogEvent) { + lastBinlogEvent.set((MySQLBaseBinlogEvent) msg); blockingEventQueue.put(Collections.singletonList(lastBinlogEvent.get())); } } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java index d53c129142191..0b6628d0c34a1 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java @@ -24,14 +24,14 @@ import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractRowsEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.DeleteRowsEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.QueryEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.UpdateRowsEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.WriteRowsEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.XidEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query.MySQLQueryBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction.MySQLXidBinlogEvent; import org.apache.shardingsphere.db.protocol.constant.CommonConstants; import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinlogEventType; import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.MySQLBinlogEventHeader; @@ -58,7 +58,7 @@ public final class MySQLBinlogEventPacketDecoder extends ByteToMessageDecoder { private final boolean decodeWithTX; - private List records = new LinkedList<>(); + private List records = new LinkedList<>(); public MySQLBinlogEventPacketDecoder(final int checksumLength, final Map tableMap, final boolean decodeWithTX) { this.decodeWithTX = decodeWithTX; @@ -75,12 +75,12 @@ protected void decode(final ChannelHandlerContext ctx, final ByteBuf in, final L if (!checkEventIntegrity(in, binlogEventHeader)) { return; } - Optional binlogEvent = decodeEvent(binlogEventHeader, payload); + Optional binlogEvent = decodeEvent(binlogEventHeader, payload); if (!binlogEvent.isPresent()) { skipChecksum(binlogEventHeader.getEventType(), in); return; } - if (binlogEvent.get() instanceof PlaceholderEvent) { + if (binlogEvent.get() instanceof PlaceholderBinlogEvent) { out.add(binlogEvent.get()); skipChecksum(binlogEventHeader.getEventType(), in); return; @@ -120,15 +120,15 @@ private boolean checkEventIntegrity(final ByteBuf in, final MySQLBinlogEventHead return true; } - private void processEventWithTX(final AbstractBinlogEvent binlogEvent, final List out) { - if (binlogEvent instanceof QueryEvent) { - QueryEvent queryEvent = (QueryEvent) binlogEvent; + private void processEventWithTX(final MySQLBaseBinlogEvent binlogEvent, final List out) { + if (binlogEvent instanceof MySQLQueryBinlogEvent) { + MySQLQueryBinlogEvent queryEvent = (MySQLQueryBinlogEvent) binlogEvent; if (TX_BEGIN_SQL.equals(queryEvent.getSql())) { records = new LinkedList<>(); } else { out.add(binlogEvent); } - } else if (binlogEvent instanceof XidEvent) { + } else if (binlogEvent instanceof MySQLXidBinlogEvent) { records.add(binlogEvent); out.add(records); } else { @@ -136,9 +136,9 @@ private void processEventWithTX(final AbstractBinlogEvent binlogEvent, final Lis } } - private void processEventIgnoreTX(final AbstractBinlogEvent binlogEvent, final List out) { - if (binlogEvent instanceof QueryEvent) { - QueryEvent queryEvent = (QueryEvent) binlogEvent; + private void processEventIgnoreTX(final MySQLBaseBinlogEvent binlogEvent, final List out) { + if (binlogEvent instanceof MySQLQueryBinlogEvent) { + MySQLQueryBinlogEvent queryEvent = (MySQLQueryBinlogEvent) binlogEvent; if (TX_BEGIN_SQL.equals(queryEvent.getSql())) { return; } @@ -146,7 +146,7 @@ private void processEventIgnoreTX(final AbstractBinlogEvent binlogEvent, final L out.add(binlogEvent); } - private Optional decodeEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { + private Optional decodeEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { switch (MySQLBinlogEventType.valueOf(binlogEventHeader.getEventType()).orElse(MySQLBinlogEventType.UNKNOWN_EVENT)) { case ROTATE_EVENT: decodeRotateEvent(binlogEventHeader, payload); @@ -197,35 +197,35 @@ private void decodeTableMapEvent(final MySQLBinlogEventHeader binlogEventHeader, binlogContext.putTableMapEvent(packet.getTableId(), packet); } - private WriteRowsEvent decodeWriteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { + private MySQLWriteRowsBinlogEvent decodeWriteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload); packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload); - WriteRowsEvent result = new WriteRowsEvent(); + MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(); initRowsEvent(result, binlogEventHeader, packet.getTableId()); result.setAfterRows(packet.getRows()); return result; } - private UpdateRowsEvent decodeUpdateRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { + private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload); packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload); - UpdateRowsEvent result = new UpdateRowsEvent(); + MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(); initRowsEvent(result, binlogEventHeader, packet.getTableId()); result.setBeforeRows(packet.getRows()); result.setAfterRows(packet.getRows2()); return result; } - private DeleteRowsEvent decodeDeleteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { + private MySQLDeleteRowsBinlogEvent decodeDeleteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload); packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload); - DeleteRowsEvent result = new DeleteRowsEvent(); + MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(); initRowsEvent(result, binlogEventHeader, packet.getTableId()); result.setBeforeRows(packet.getRows()); return result; } - private void initRowsEvent(final AbstractRowsEvent rowsEvent, final MySQLBinlogEventHeader binlogEventHeader, final long tableId) { + private void initRowsEvent(final MySQLBaseRowsBinlogEvent rowsEvent, final MySQLBinlogEventHeader binlogEventHeader, final long tableId) { rowsEvent.setDatabaseName(binlogContext.getTableMapEvent(tableId).getSchemaName()); rowsEvent.setTableName(binlogContext.getTableMapEvent(tableId).getTableName()); rowsEvent.setFileName(binlogContext.getFileName()); @@ -233,8 +233,8 @@ private void initRowsEvent(final AbstractRowsEvent rowsEvent, final MySQLBinlogE rowsEvent.setTimestamp(binlogEventHeader.getTimestamp()); } - private PlaceholderEvent decodePlaceholderEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { - PlaceholderEvent result = createPlaceholderEvent(binlogEventHeader); + private PlaceholderBinlogEvent decodePlaceholderEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { + PlaceholderBinlogEvent result = createPlaceholderEvent(binlogEventHeader); int remainDataLength = binlogEventHeader.getEventSize() + 1 - binlogEventHeader.getChecksumLength() - payload.getByteBuf().readerIndex(); if (remainDataLength > 0) { payload.skipReserved(remainDataLength); @@ -242,7 +242,7 @@ private PlaceholderEvent decodePlaceholderEvent(final MySQLBinlogEventHeader bin return result; } - private QueryEvent decodeQueryEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { + private MySQLQueryBinlogEvent decodeQueryEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { int threadId = payload.readInt4(); int executionTime = payload.readInt4(); payload.skipReserved(1); @@ -250,15 +250,15 @@ private QueryEvent decodeQueryEvent(final MySQLBinlogEventHeader binlogEventHead payload.skipReserved(payload.readInt2()); String databaseName = payload.readStringNul(); String sql = payload.readStringFix(payload.getByteBuf().readableBytes() - binlogEventHeader.getChecksumLength()); - QueryEvent result = new QueryEvent(threadId, executionTime, errorCode, databaseName, sql); + MySQLQueryBinlogEvent result = new MySQLQueryBinlogEvent(threadId, executionTime, errorCode, databaseName, sql); result.setFileName(binlogContext.getFileName()); result.setPosition(binlogEventHeader.getLogPos()); result.setTimestamp(binlogEventHeader.getTimestamp()); return result; } - private XidEvent decodeXidEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { - XidEvent result = new XidEvent(payload.readInt8()); + private MySQLXidBinlogEvent decodeXidEvent(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { + MySQLXidBinlogEvent result = new MySQLXidBinlogEvent(payload.readInt8()); result.setFileName(binlogContext.getFileName()); result.setPosition(binlogEventHeader.getLogPos()); result.setTimestamp(binlogEventHeader.getTimestamp()); @@ -266,8 +266,8 @@ private XidEvent decodeXidEvent(final MySQLBinlogEventHeader binlogEventHeader, } // TODO May be used again later, keep this method first. - private PlaceholderEvent createPlaceholderEvent(final MySQLBinlogEventHeader binlogEventHeader) { - PlaceholderEvent result = new PlaceholderEvent(); + private PlaceholderBinlogEvent createPlaceholderEvent(final MySQLBinlogEventHeader binlogEventHeader) { + PlaceholderBinlogEvent result = new PlaceholderBinlogEvent(); result.setFileName(binlogContext.getFileName()); result.setPosition(binlogEventHeader.getLogPos()); result.setTimestamp(binlogEventHeader.getTimestamp()); diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java index 8c15f860ade8f..3f6f2d2720e62 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumper.java @@ -34,11 +34,11 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogPosition; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.data.MySQLBinlogDataHandler; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractRowsEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.DeleteRowsEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.UpdateRowsEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.WriteRowsEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLBaseRowsBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.ConnectInfo; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.MySQLBinlogClient; import org.apache.shardingsphere.infra.annotation.HighFrequencyInvocation; @@ -103,9 +103,9 @@ protected void runBlocking() { } } - private void handleEvents(final List events) { + private void handleEvents(final List events) { List dataRecords = new LinkedList<>(); - for (AbstractBinlogEvent each : events) { + for (MySQLBaseBinlogEvent each : events) { dataRecords.addAll(handleEvent(each)); } if (!dataRecords.isEmpty()) { @@ -113,28 +113,28 @@ private void handleEvents(final List events) { } } - private List handleEvent(final AbstractBinlogEvent event) { - if (!(event instanceof AbstractRowsEvent)) { + private List handleEvent(final MySQLBaseBinlogEvent event) { + if (!(event instanceof MySQLBaseRowsBinlogEvent)) { return Collections.singletonList(createPlaceholderRecord(event)); } - AbstractRowsEvent rowsEvent = (AbstractRowsEvent) event; + MySQLBaseRowsBinlogEvent rowsEvent = (MySQLBaseRowsBinlogEvent) event; if (!rowsEvent.getDatabaseName().equals(catalog) || !dumperContext.getCommonContext().getTableNameMapper().containsTable(rowsEvent.getTableName())) { return Collections.singletonList(createPlaceholderRecord(event)); } PipelineTableMetaData tableMetaData = getPipelineTableMetaData(rowsEvent.getTableName()); - if (event instanceof WriteRowsEvent) { - return handleWriteRowsEvent((WriteRowsEvent) event, tableMetaData); + if (event instanceof MySQLWriteRowsBinlogEvent) { + return handleWriteRowsEvent((MySQLWriteRowsBinlogEvent) event, tableMetaData); } - if (event instanceof UpdateRowsEvent) { - return handleUpdateRowsEvent((UpdateRowsEvent) event, tableMetaData); + if (event instanceof MySQLUpdateRowsBinlogEvent) { + return handleUpdateRowsEvent((MySQLUpdateRowsBinlogEvent) event, tableMetaData); } - if (event instanceof DeleteRowsEvent) { - return handleDeleteRowsEvent((DeleteRowsEvent) event, tableMetaData); + if (event instanceof MySQLDeleteRowsBinlogEvent) { + return handleDeleteRowsEvent((MySQLDeleteRowsBinlogEvent) event, tableMetaData); } return Collections.emptyList(); } - private PlaceholderRecord createPlaceholderRecord(final AbstractBinlogEvent event) { + private PlaceholderRecord createPlaceholderRecord(final MySQLBaseBinlogEvent event) { PlaceholderRecord result = new PlaceholderRecord(new MySQLBinlogPosition(event.getFileName(), event.getPosition())); result.setCommitTime(event.getTimestamp() * 1000L); return result; @@ -145,7 +145,7 @@ private PipelineTableMetaData getPipelineTableMetaData(final String actualTableN return metaDataLoader.getTableMetaData(dumperContext.getCommonContext().getTableAndSchemaNameMapper().getSchemaName(logicTableName), actualTableName); } - private List handleWriteRowsEvent(final WriteRowsEvent event, final PipelineTableMetaData tableMetaData) { + private List handleWriteRowsEvent(final MySQLWriteRowsBinlogEvent event, final PipelineTableMetaData tableMetaData) { List result = new LinkedList<>(); for (Serializable[] each : event.getAfterRows()) { DataRecord dataRecord = createDataRecord(PipelineSQLOperationType.INSERT, event, each.length); @@ -158,7 +158,7 @@ private List handleWriteRowsEvent(final WriteRowsEvent event, final return result; } - private List handleUpdateRowsEvent(final UpdateRowsEvent event, final PipelineTableMetaData tableMetaData) { + private List handleUpdateRowsEvent(final MySQLUpdateRowsBinlogEvent event, final PipelineTableMetaData tableMetaData) { List result = new LinkedList<>(); for (int i = 0; i < event.getBeforeRows().size(); i++) { Serializable[] beforeValues = event.getBeforeRows().get(i); @@ -176,7 +176,7 @@ private List handleUpdateRowsEvent(final UpdateRowsEvent event, fina return result; } - private List handleDeleteRowsEvent(final DeleteRowsEvent event, final PipelineTableMetaData tableMetaData) { + private List handleDeleteRowsEvent(final MySQLDeleteRowsBinlogEvent event, final PipelineTableMetaData tableMetaData) { List result = new LinkedList<>(); for (Serializable[] each : event.getBeforeRows()) { DataRecord dataRecord = createDataRecord(PipelineSQLOperationType.DELETE, event, each.length); @@ -189,7 +189,7 @@ private List handleDeleteRowsEvent(final DeleteRowsEvent event, fina return result; } - private DataRecord createDataRecord(final PipelineSQLOperationType type, final AbstractRowsEvent rowsEvent, final int columnCount) { + private DataRecord createDataRecord(final PipelineSQLOperationType type, final MySQLBaseRowsBinlogEvent rowsEvent, final int columnCount) { String tableName = dumperContext.getCommonContext().getTableNameMapper().getLogicTableName(rowsEvent.getTableName()).toString(); IngestPosition binlogPosition = new MySQLBinlogPosition(rowsEvent.getFileName(), rowsEvent.getPosition()); DataRecord result = new DataRecord(type, tableName, binlogPosition, columnCount); diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java index 9d27fdd8f12e0..b4a6a798658af 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoderTest.java @@ -24,11 +24,11 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.util.internal.StringUtil; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogContext; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.DeleteRowsEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.QueryEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.UpdateRowsEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.WriteRowsEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.XidEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query.MySQLQueryBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction.MySQLXidBinlogEvent; import org.apache.shardingsphere.db.protocol.constant.CommonConstants; import org.apache.shardingsphere.db.protocol.mysql.constant.MySQLBinaryColumnType; import org.apache.shardingsphere.db.protocol.mysql.packet.binlog.row.MySQLBinlogTableMapEventPacket; @@ -120,9 +120,9 @@ void assertDecodeQueryEvent() { binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents); assertFalse(decodedEvents.isEmpty()); Object actual = decodedEvents.get(0); - assertInstanceOf(QueryEvent.class, actual); - assertThat(((QueryEvent) actual).getTimestamp(), is(1700193011L)); - assertThat(((QueryEvent) actual).getPosition(), is(168785090L)); + assertInstanceOf(MySQLQueryBinlogEvent.class, actual); + assertThat(((MySQLQueryBinlogEvent) actual).getTimestamp(), is(1700193011L)); + assertThat(((MySQLQueryBinlogEvent) actual).getPosition(), is(168785090L)); } @Test @@ -149,8 +149,8 @@ void assertDecodeWriteRowEvent() { binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents); assertThat(decodedEvents.size(), is(1)); LinkedList actualEventList = (LinkedList) decodedEvents.get(0); - assertThat(actualEventList.get(0), instanceOf(WriteRowsEvent.class)); - WriteRowsEvent actual = (WriteRowsEvent) actualEventList.get(0); + assertThat(actualEventList.get(0), instanceOf(MySQLWriteRowsBinlogEvent.class)); + MySQLWriteRowsBinlogEvent actual = (MySQLWriteRowsBinlogEvent) actualEventList.get(0); assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, new MySQLBinaryString("SUCCESS".getBytes()), null})); } @@ -167,8 +167,8 @@ void assertDecodeUpdateRowEvent() { binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents); assertThat(decodedEvents.size(), is(1)); LinkedList actualEventList = (LinkedList) decodedEvents.get(0); - assertThat(actualEventList.get(0), instanceOf(UpdateRowsEvent.class)); - UpdateRowsEvent actual = (UpdateRowsEvent) actualEventList.get(0); + assertThat(actualEventList.get(0), instanceOf(MySQLUpdateRowsBinlogEvent.class)); + MySQLUpdateRowsBinlogEvent actual = (MySQLUpdateRowsBinlogEvent) actualEventList.get(0); assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, new MySQLBinaryString("SUCCESS".getBytes()), null})); assertThat(actual.getAfterRows().get(0), is(new Serializable[]{1L, 1, new MySQLBinaryString("updated".getBytes()), null})); } @@ -185,9 +185,9 @@ void assertDecodeDeleteRowEvent() { binlogEventPacketDecoder.decode(channelHandlerContext, byteBuf, decodedEvents); assertThat(decodedEvents.size(), is(1)); LinkedList actualEventList = (LinkedList) decodedEvents.get(0); - assertThat(actualEventList.get(0), instanceOf(DeleteRowsEvent.class)); - assertThat(actualEventList.get(1), instanceOf(XidEvent.class)); - DeleteRowsEvent actual = (DeleteRowsEvent) actualEventList.get(0); + assertThat(actualEventList.get(0), instanceOf(MySQLDeleteRowsBinlogEvent.class)); + assertThat(actualEventList.get(1), instanceOf(MySQLXidBinlogEvent.class)); + MySQLDeleteRowsBinlogEvent actual = (MySQLDeleteRowsBinlogEvent) actualEventList.get(0); assertThat(actual.getBeforeRows().get(0), is(new Serializable[]{1L, 1, new MySQLBinaryString("SUCCESS".getBytes()), null})); } diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java index 51a398810ed15..e883c9a68ad81 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java @@ -32,11 +32,11 @@ import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineColumnMetaData; import org.apache.shardingsphere.data.pipeline.core.metadata.model.PipelineTableMetaData; import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.MySQLBinlogPosition; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.DeleteRowsEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.UpdateRowsEvent; -import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.WriteRowsEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLDeleteRowsBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLUpdateRowsBinlogEvent; +import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows.MySQLWriteRowsBinlogEvent; import org.apache.shardingsphere.infra.metadata.caseinsensitive.CaseInsensitiveIdentifier; import org.apache.shardingsphere.test.fixture.jdbc.MockedDriver; import org.junit.jupiter.api.BeforeAll; @@ -141,16 +141,16 @@ void assertWriteRowsEvent() throws ReflectiveOperationException { assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3)); } - private WriteRowsEvent createWriteRowsEvent() { - WriteRowsEvent result = new WriteRowsEvent(); + private MySQLWriteRowsBinlogEvent createWriteRowsEvent() { + MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(); result.setDatabaseName(""); result.setTableName("t_order"); result.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK"})); return result; } - private List getRecordsByWriteRowsEvent(final WriteRowsEvent rowsEvent) throws ReflectiveOperationException { - Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", WriteRowsEvent.class, PipelineTableMetaData.class); + private List getRecordsByWriteRowsEvent(final MySQLWriteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException { + Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", MySQLWriteRowsBinlogEvent.class, PipelineTableMetaData.class); return (List) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData); } @@ -163,8 +163,8 @@ void assertUpdateRowsEvent() throws ReflectiveOperationException { assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3)); } - private UpdateRowsEvent createUpdateRowsEvent() { - UpdateRowsEvent result = new UpdateRowsEvent(); + private MySQLUpdateRowsBinlogEvent createUpdateRowsEvent() { + MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(); result.setDatabaseName("test"); result.setTableName("t_order"); result.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"})); @@ -172,8 +172,8 @@ private UpdateRowsEvent createUpdateRowsEvent() { return result; } - private List getRecordsByUpdateRowsEvent(final UpdateRowsEvent rowsEvent) throws ReflectiveOperationException { - Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", UpdateRowsEvent.class, PipelineTableMetaData.class); + private List getRecordsByUpdateRowsEvent(final MySQLUpdateRowsBinlogEvent rowsEvent) throws ReflectiveOperationException { + Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", MySQLUpdateRowsBinlogEvent.class, PipelineTableMetaData.class); return (List) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData); } @@ -186,36 +186,36 @@ void assertDeleteRowsEvent() throws ReflectiveOperationException { assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3)); } - private DeleteRowsEvent createDeleteRowsEvent() { - DeleteRowsEvent result = new DeleteRowsEvent(); + private MySQLDeleteRowsBinlogEvent createDeleteRowsEvent() { + MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(); result.setDatabaseName(""); result.setTableName("t_order"); result.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"})); return result; } - private List getRecordsByDeleteRowsEvent(final DeleteRowsEvent rowsEvent) throws ReflectiveOperationException { - Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", DeleteRowsEvent.class, PipelineTableMetaData.class); + private List getRecordsByDeleteRowsEvent(final MySQLDeleteRowsBinlogEvent rowsEvent) throws ReflectiveOperationException { + Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", MySQLDeleteRowsBinlogEvent.class, PipelineTableMetaData.class); return (List) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData); } @Test void assertPlaceholderEvent() throws ReflectiveOperationException { - List actual = (List) Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent", AbstractBinlogEvent.class), - incrementalDumper, new PlaceholderEvent()); + List actual = (List) Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent", MySQLBaseBinlogEvent.class), + incrementalDumper, new PlaceholderBinlogEvent()); assertThat(actual.size(), is(1)); } @Test void assertRowsEventFiltered() throws ReflectiveOperationException { - List actual = (List) Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent", AbstractBinlogEvent.class), + List actual = (List) Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent", MySQLBaseBinlogEvent.class), incrementalDumper, getFilteredWriteRowsEvent()); assertThat(actual.size(), is(1)); assertThat(actual.get(0), instanceOf(DataRecord.class)); } - private WriteRowsEvent getFilteredWriteRowsEvent() { - WriteRowsEvent result = new WriteRowsEvent(); + private MySQLWriteRowsBinlogEvent getFilteredWriteRowsEvent() { + MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(); result.setDatabaseName("test"); result.setTableName("t_order"); result.setAfterRows(Collections.singletonList(new Serializable[]{1})); From bbdbf25ed9bdae7a682f9653bbfba353103d4e41 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Fri, 16 Aug 2024 22:49:07 +0800 Subject: [PATCH 2/4] Refactor MySQLWriteRowsBinlogEvent --- .../binlog/event/rows/MySQLWriteRowsBinlogEvent.java | 6 +++--- .../client/netty/MySQLBinlogEventPacketDecoder.java | 3 +-- .../incremental/dumper/MySQLIncrementalDumperTest.java | 6 ++---- 3 files changed, 6 insertions(+), 9 deletions(-) diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java index 7c52bb4066be8..912d54e14d1a1 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLWriteRowsBinlogEvent.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows; import lombok.Getter; -import lombok.Setter; +import lombok.RequiredArgsConstructor; import java.io.Serializable; import java.util.List; @@ -26,9 +26,9 @@ /** * MySQL write rows binlog event. */ +@RequiredArgsConstructor @Getter -@Setter public final class MySQLWriteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent { - private List afterRows; + private final List afterRows; } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java index 0b6628d0c34a1..cea99e7020755 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java @@ -200,9 +200,8 @@ private void decodeTableMapEvent(final MySQLBinlogEventHeader binlogEventHeader, private MySQLWriteRowsBinlogEvent decodeWriteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload); packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload); - MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(); + MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(packet.getRows()); initRowsEvent(result, binlogEventHeader, packet.getTableId()); - result.setAfterRows(packet.getRows()); return result; } diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java index e883c9a68ad81..78c9fef666356 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java @@ -142,10 +142,9 @@ void assertWriteRowsEvent() throws ReflectiveOperationException { } private MySQLWriteRowsBinlogEvent createWriteRowsEvent() { - MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(); + MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, "OK"})); result.setDatabaseName(""); result.setTableName("t_order"); - result.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK"})); return result; } @@ -215,10 +214,9 @@ void assertRowsEventFiltered() throws ReflectiveOperationException { } private MySQLWriteRowsBinlogEvent getFilteredWriteRowsEvent() { - MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(); + MySQLWriteRowsBinlogEvent result = new MySQLWriteRowsBinlogEvent(Collections.singletonList(new Serializable[]{1})); result.setDatabaseName("test"); result.setTableName("t_order"); - result.setAfterRows(Collections.singletonList(new Serializable[]{1})); return result; } } From e84ecc8dee25be21167491cb87a45459925fefd8 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Fri, 16 Aug 2024 22:50:26 +0800 Subject: [PATCH 3/4] Refactor MySQLUpdateRowsBinlogEvent --- .../binlog/event/rows/MySQLUpdateRowsBinlogEvent.java | 8 ++++---- .../client/netty/MySQLBinlogEventPacketDecoder.java | 4 +--- .../incremental/dumper/MySQLIncrementalDumperTest.java | 4 +--- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java index 0ecdebd3ab092..e8b993623685b 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLUpdateRowsBinlogEvent.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows; import lombok.Getter; -import lombok.Setter; +import lombok.RequiredArgsConstructor; import java.io.Serializable; import java.util.List; @@ -26,11 +26,11 @@ /** * MySQL update rows binlog event. */ +@RequiredArgsConstructor @Getter -@Setter public final class MySQLUpdateRowsBinlogEvent extends MySQLBaseRowsBinlogEvent { - private List beforeRows; + private final List beforeRows; - private List afterRows; + private final List afterRows; } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java index cea99e7020755..a4969182f9d5f 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java @@ -208,10 +208,8 @@ private MySQLWriteRowsBinlogEvent decodeWriteRowsEventV2(final MySQLBinlogEventH private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload); packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload); - MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(); + MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(packet.getRows(), packet.getRows2()); initRowsEvent(result, binlogEventHeader, packet.getTableId()); - result.setBeforeRows(packet.getRows()); - result.setAfterRows(packet.getRows2()); return result; } diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java index 78c9fef666356..b49088924a50d 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java @@ -163,11 +163,9 @@ void assertUpdateRowsEvent() throws ReflectiveOperationException { } private MySQLUpdateRowsBinlogEvent createUpdateRowsEvent() { - MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(); + MySQLUpdateRowsBinlogEvent result = new MySQLUpdateRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, "OK"}), Collections.singletonList(new Serializable[]{101, 1, "OK2"})); result.setDatabaseName("test"); result.setTableName("t_order"); - result.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"})); - result.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK2"})); return result; } From 063f105556e6b88c2b171446293c736bf7db5b01 Mon Sep 17 00:00:00 2001 From: zhangliang Date: Fri, 16 Aug 2024 22:51:23 +0800 Subject: [PATCH 4/4] Refactor MySQLDeleteRowsBinlogEvent --- .../binlog/event/rows/MySQLDeleteRowsBinlogEvent.java | 6 +++--- .../client/netty/MySQLBinlogEventPacketDecoder.java | 3 +-- .../incremental/dumper/MySQLIncrementalDumperTest.java | 3 +-- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java index a2221e988f1f7..e403d6d643c27 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/binlog/event/rows/MySQLDeleteRowsBinlogEvent.java @@ -18,7 +18,7 @@ package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows; import lombok.Getter; -import lombok.Setter; +import lombok.RequiredArgsConstructor; import java.io.Serializable; import java.util.List; @@ -26,9 +26,9 @@ /** * MySQL delete rows binlog event. */ +@RequiredArgsConstructor @Getter -@Setter public final class MySQLDeleteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent { - private List beforeRows; + private final List beforeRows; } diff --git a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java index a4969182f9d5f..223c7ed08ea00 100644 --- a/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java +++ b/kernel/data-pipeline/dialect/mysql/src/main/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/client/netty/MySQLBinlogEventPacketDecoder.java @@ -216,9 +216,8 @@ private MySQLUpdateRowsBinlogEvent decodeUpdateRowsEventV2(final MySQLBinlogEven private MySQLDeleteRowsBinlogEvent decodeDeleteRowsEventV2(final MySQLBinlogEventHeader binlogEventHeader, final MySQLPacketPayload payload) { MySQLBinlogRowsEventPacket packet = new MySQLBinlogRowsEventPacket(binlogEventHeader, payload); packet.readRows(binlogContext.getTableMapEvent(packet.getTableId()), payload); - MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(); + MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(packet.getRows()); initRowsEvent(result, binlogEventHeader, packet.getTableId()); - result.setBeforeRows(packet.getRows()); return result; } diff --git a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java index b49088924a50d..cae3e3bb80c33 100644 --- a/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java +++ b/kernel/data-pipeline/dialect/mysql/src/test/java/org/apache/shardingsphere/data/pipeline/mysql/ingest/incremental/dumper/MySQLIncrementalDumperTest.java @@ -184,10 +184,9 @@ void assertDeleteRowsEvent() throws ReflectiveOperationException { } private MySQLDeleteRowsBinlogEvent createDeleteRowsEvent() { - MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(); + MySQLDeleteRowsBinlogEvent result = new MySQLDeleteRowsBinlogEvent(Collections.singletonList(new Serializable[]{101, 1, "OK"})); result.setDatabaseName(""); result.setTableName("t_order"); - result.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"})); return result; }