diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java index c6aebc8aad7..0b6ea40eaa9 100644 --- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java +++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/reader/fetch/MySqlSourceFetchTaskContext.java @@ -18,6 +18,7 @@ package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.reader.fetch; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.utils.ReflectionUtils; import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig; import org.apache.seatunnel.connectors.cdc.base.dialect.JdbcDataSourceDialect; import org.apache.seatunnel.connectors.cdc.base.relational.JdbcSourceEventDispatcher; @@ -70,6 +71,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY; import static org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils.MySqlConnectionUtils.createBinaryClient; @@ -326,13 +328,27 @@ public MySqlTaskContextImpl( MySqlDatabaseSchema schema, BinaryLogClient reusedBinaryLogClient) { super(config, schema); - this.reusedBinaryLogClient = reusedBinaryLogClient; + this.reusedBinaryLogClient = resetBinaryLogClient(reusedBinaryLogClient); } @Override public BinaryLogClient getBinaryLogClient() { return reusedBinaryLogClient; } + + /** reset the listener of binaryLogClient before fetch task start. */ + private BinaryLogClient resetBinaryLogClient(BinaryLogClient binaryLogClient) { + Optional eventListenersField = + ReflectionUtils.getField( + binaryLogClient, BinaryLogClient.class, "eventListeners"); + eventListenersField.ifPresent(o -> ((List) o).clear()); + Optional lifecycleListeners = + ReflectionUtils.getField( + binaryLogClient, BinaryLogClient.class, "lifecycleListeners"); + lifecycleListeners.ifPresent( + o -> ((List) o).clear()); + return binaryLogClient; + } } /** Copied from debezium for accessing here. */