Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rename MySQLBaseBinlogEvent #32563

Merged
merged 4 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import lombok.Setter;

/**
* Abstract binlog event.
* MySQL base binlog event.
*/
@Getter
@Setter
public abstract class AbstractBinlogEvent {
public abstract class MySQLBaseBinlogEvent {

private String fileName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@
/**
* Placeholder binlog event, unsupported binlog event will replace it into this class.
*/
public final class PlaceholderEvent extends AbstractBinlogEvent {
public final class PlaceholderBinlogEvent extends MySQLBaseBinlogEvent {
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.query;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;

/**
* Query event.This event is written into the binary log file for:
Expand All @@ -30,7 +31,7 @@
*/
@RequiredArgsConstructor
@Getter
public final class QueryEvent extends AbstractBinlogEvent {
public final class MySQLQueryBinlogEvent extends MySQLBaseBinlogEvent {

private final long threadId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.Setter;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;

/**
* Abstract rows event.
* MySQL rows base event.
*/
@Getter
@Setter
public abstract class AbstractRowsEvent extends AbstractBinlogEvent {
public abstract class MySQLBaseRowsBinlogEvent extends MySQLBaseBinlogEvent {

private String databaseName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.Setter;
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.List;

/**
* Delete rows event.
* MySQL delete rows binlog event.
*/
@RequiredArgsConstructor
@Getter
@Setter
public final class DeleteRowsEvent extends AbstractRowsEvent {
public final class MySQLDeleteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {

private List<Serializable[]> beforeRows;
private final List<Serializable[]> beforeRows;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.Setter;
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.List;

/**
* Update rows event.
* MySQL update rows binlog event.
*/
@RequiredArgsConstructor
@Getter
@Setter
public final class UpdateRowsEvent extends AbstractRowsEvent {
public final class MySQLUpdateRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {

private List<Serializable[]> beforeRows;
private final List<Serializable[]> beforeRows;

private List<Serializable[]> afterRows;
private final List<Serializable[]> afterRows;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.rows;

import lombok.Getter;
import lombok.Setter;
import lombok.RequiredArgsConstructor;

import java.io.Serializable;
import java.util.List;

/**
* Write rows event.
* MySQL write rows binlog event.
*/
@RequiredArgsConstructor
@Getter
@Setter
public final class WriteRowsEvent extends AbstractRowsEvent {
public final class MySQLWriteRowsBinlogEvent extends MySQLBaseRowsBinlogEvent {

private List<Serializable[]> afterRows;
private final List<Serializable[]> afterRows;
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@
* limitations under the License.
*/

package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event;
package org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.transaction;

import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;

/**
* XID event is generated for a COMMIT of a transaction that modifies one or more tables of an XA-capable storage engine.
Expand All @@ -27,7 +28,7 @@
*/
@RequiredArgsConstructor
@Getter
public final class XidEvent extends AbstractBinlogEvent {
public final class MySQLXidBinlogEvent extends MySQLBaseBinlogEvent {

private final long xid;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.shardingsphere.data.pipeline.core.exception.PipelineInternalException;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.AbstractBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.MySQLBaseBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.binlog.event.PlaceholderBinlogEvent;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLBinlogEventPacketDecoder;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLCommandPacketDecoder;
import org.apache.shardingsphere.data.pipeline.mysql.ingest.incremental.client.netty.MySQLNegotiateHandler;
Expand Down Expand Up @@ -77,7 +77,7 @@ public final class MySQLBinlogClient {

private final boolean decodeWithTX;

private final ArrayBlockingQueue<List<AbstractBinlogEvent>> blockingEventQueue = new ArrayBlockingQueue<>(2500);
private final ArrayBlockingQueue<List<MySQLBaseBinlogEvent>> blockingEventQueue = new ArrayBlockingQueue<>(2500);

private EventLoopGroup eventLoopGroup;

Expand Down Expand Up @@ -226,8 +226,8 @@ private void dumpBinlog(final String binlogFileName, final long binlogPosition,
channel.writeAndFlush(new MySQLComBinlogDumpCommandPacket((int) binlogPosition, connectInfo.getServerId(), binlogFileName));
}

private AbstractBinlogEvent getLastBinlogEvent(final String binlogFileName, final long binlogPosition) {
PlaceholderEvent result = new PlaceholderEvent();
private MySQLBaseBinlogEvent getLastBinlogEvent(final String binlogFileName, final long binlogPosition) {
PlaceholderBinlogEvent result = new PlaceholderBinlogEvent();
result.setFileName(binlogFileName);
result.setPosition(binlogPosition);
return result;
Expand All @@ -242,12 +242,12 @@ private void resetSequenceID() {
*
* @return binlog event
*/
public synchronized List<AbstractBinlogEvent> poll() {
public synchronized List<MySQLBaseBinlogEvent> poll() {
if (!running) {
return Collections.emptyList();
}
try {
List<AbstractBinlogEvent> result = blockingEventQueue.poll(100L, TimeUnit.MILLISECONDS);
List<MySQLBaseBinlogEvent> result = blockingEventQueue.poll(100L, TimeUnit.MILLISECONDS);
return null == result ? Collections.emptyList() : result;
} catch (final InterruptedException ignored) {
Thread.currentThread().interrupt();
Expand Down Expand Up @@ -314,11 +314,11 @@ public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cau

private final class MySQLBinlogEventHandler extends ChannelInboundHandlerAdapter {

private final AtomicReference<AbstractBinlogEvent> lastBinlogEvent;
private final AtomicReference<MySQLBaseBinlogEvent> lastBinlogEvent;

private final AtomicBoolean reconnectRequested = new AtomicBoolean(false);

MySQLBinlogEventHandler(final AbstractBinlogEvent lastBinlogEvent) {
MySQLBinlogEventHandler(final MySQLBaseBinlogEvent lastBinlogEvent) {
this.lastBinlogEvent = new AtomicReference<>(lastBinlogEvent);
}

Expand All @@ -329,7 +329,7 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
return;
}
if (msg instanceof List) {
List<AbstractBinlogEvent> records = (List<AbstractBinlogEvent>) msg;
List<MySQLBaseBinlogEvent> records = (List<MySQLBaseBinlogEvent>) msg;
if (records.isEmpty()) {
log.warn("The records is empty");
return;
Expand All @@ -338,8 +338,8 @@ public void channelRead(final ChannelHandlerContext ctx, final Object msg) throw
blockingEventQueue.put(records);
return;
}
if (msg instanceof AbstractBinlogEvent) {
lastBinlogEvent.set((AbstractBinlogEvent) msg);
if (msg instanceof MySQLBaseBinlogEvent) {
lastBinlogEvent.set((MySQLBaseBinlogEvent) msg);
blockingEventQueue.put(Collections.singletonList(lastBinlogEvent.get()));
}
}
Expand Down
Loading