diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java
index 11553ccda5a..6205c9c7ab6 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java
@@ -29,7 +29,9 @@
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
+import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
+import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
@@ -45,10 +47,14 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_FILENAME_OFFSET_KEY;
import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_POSITION_OFFSET_KEY;
@@ -206,11 +212,103 @@ private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws IOExcepti
String historyStr = DOCUMENT_WRITER.write(historyRecord.document());
Struct value = new Struct(schemaChangeValueSchema);
- value.put(HistoryRecord.Fields.SOURCE, event.getSource());
+ value.put(HistoryRecord.Fields.SOURCE, rewriteTableNameIfNeeded(event));
value.put(HISTORY_RECORD_FIELD, historyStr);
return value;
}
+ /**
+ * Rewrites the table name in the Source if needed to handle schema changes properly.
+ *
+ *
This method addresses a specific issue when renaming multiple tables within a single
+ * statement, such as: {@code RENAME TABLE customers TO customers_old, customers_copy TO
+ * customers;}.
+ *
+ *
In such cases, Debezium's {@link io.debezium.connector.mysql.MySqlDatabaseSchema}
+ * emits two separate change events:
+ *
+ *
Both events share a table name of {@code customers, customers_old} in their source
+ * info, which includes multiple table IDs in a single string.
+ *
+ *
On the other hand, the {@code TableChanges.TableChange#id} correctly identifies the
+ * schema change:
+ *
+ *
The problem arises because {@link
+ * org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does not expect
+ * multiple table IDs in the source info. As a result, changes for tables defined by the
+ * table filter configuration (e.g., {@code customers}) may be filtered out unintentionally.
+ * This can lead to schema changes not being saved in the state, which is crucial for
+ * recovering the job from a snapshot.
+ *
+ *
tableNames = parseTableNames(tableName);
+ if (2 <= tableNames.size() && event.getDdl().toLowerCase().startsWith("rename")) {
+ for (TableChanges.TableChange tableChange : event.getTableChanges()) {
+ String changedTableName = getMatchingTableName(tableNames, tableChange.getId());
+ if (changedTableName != null) {
+ LOG.debug(
+ "Rewrite table name from {} to {} on swapping tables",
+ tableName,
+ changedTableName);
+ sourceInfo.put(TABLE_NAME_KEY, changedTableName);
+ }
+ }
+ }
+ return sourceInfo;
+ }
+
+ /**
+ * Decodes table names from a comma-separated string.
+ *
+ * This method extracts individual table names from a string where multiple table names
+ * are separated by commas. The input string is constructed by {@link
+ * io.debezium.connector.mysql.SourceInfo}.
+ *
+ * @param tableName a comma-separated string containing multiple table names
+ * @return a list of trimmed table names
+ */
+ private List parseTableNames(String tableName) {
+ return Arrays.stream(tableName.split(","))
+ .map(String::trim)
+ .collect(Collectors.toList());
+ }
+
+ private String getMatchingTableName(List tableNames, TableId tableId) {
+ return tableNames.stream()
+ .filter(name -> name.equals(tableId.table()))
+ .findFirst()
+ .orElse(null);
+ }
+
@Override
public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException {
historizedSchema.applySchemaChange(event);
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleTablesRenamingITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleTablesRenamingITCase.java
new file mode 100644
index 00000000000..e8f4ac8c24a
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleTablesRenamingITCase.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cdc.common.utils.TestCaseUtils;
+import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.TestTable;
+import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.flink.streaming.api.CheckpointingMode;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import io.debezium.connector.mysql.MySqlConnection;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * Integration tests for handling schema changes regard to renaming multiple tables within a single
+ * statement.
+ */
+public class MySqlMultipleTablesRenamingITCase {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MySqlMultipleTablesRenamingITCase.class);
+ @RegisterExtension static MiniClusterExtension miniCluster = new MiniClusterExtension();
+
+ @SuppressWarnings("unchecked")
+ private static final MySqlContainer MYSQL_CONTAINER =
+ (MySqlContainer)
+ new MySqlContainer()
+ .withConfigurationOverride(
+ buildMySqlConfigWithTimezone(
+ getResourceFolder(), getSystemTimeZone()))
+ .withSetupSQL("docker/setup.sql")
+ .withDatabaseName("flink-test")
+ .withUsername("flinkuser")
+ .withPassword("flinkpw")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ private final UniqueDatabase customDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
+ private final TestTable customers =
+ new TestTable(customDatabase, "customers", TestTableSchemas.CUSTOMERS);
+
+ private MySqlConnection connection;
+
+ @BeforeAll
+ public static void before() throws Exception {
+ MYSQL_CONTAINER.start();
+ }
+
+ @AfterAll
+ public static void after() throws Exception {
+ MYSQL_CONTAINER.stop();
+ }
+
+ @BeforeEach
+ void prepare() throws Exception {
+ connection = getConnection();
+ customDatabase.createAndInitialize();
+ flushLogs();
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ customDatabase.dropDatabase();
+ connection.close();
+ }
+
+ /**
+ * Tests handling of renaming multiple tables within a single SQL statement in a Flink CDC job.
+ *
+ * This integration test validates that schema changes involving multiple table renames, such
+ * as {@code RENAME TABLE table1 TO table1_old, table2 TO table1}, are correctly processed
+ * without data loss or inconsistency.
+ *
+ *
The test covers:
+ *
+ *
+ * - Initial validation of table contents before renaming.
+ *
- Steps to rename tables, including schema changes like column drops.
+ *
- Ensuring data integrity during savepoints and job restarts.
+ *
- Validation of data consumption before and after savepoints to confirm state
+ * correctness.
+ *
+ *
+ * This ensures that the connector can accurately process and persist schema changes when
+ * tables are swapped, addressing potential issues with table filtering or mismatched table IDs
+ * during schema updates.
+ */
+ @Test
+ void testRenameTablesWithinSingleStatement() throws Exception {
+ // Build Flink job
+ StreamExecutionEnvironment env = getExecutionEnvironment();
+ MySqlSource source = getSourceBuilder().build();
+ DataStreamSource stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(), "rename-tables-test");
+ CollectResultIterator iterator = addCollector(env, stream);
+
+ // Copy transformations into another env
+ StreamExecutionEnvironment restoredEnv = getExecutionEnvironment();
+ duplicateTransformations(env, restoredEnv);
+
+ // Execute job and validate results
+ JobClient jobClient = env.executeAsync();
+ iterator.setJobClient(jobClient);
+
+ {
+ String[] expected =
+ new String[] {
+ "{\"id\":101,\"name\":\"user_1\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":102,\"name\":\"user_2\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":103,\"name\":\"user_3\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":109,\"name\":\"user_4\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":110,\"name\":\"user_5\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":111,\"name\":\"user_6\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":118,\"name\":\"user_7\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":121,\"name\":\"user_8\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":123,\"name\":\"user_9\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":1009,\"name\":\"user_10\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":1010,\"name\":\"user_11\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":1011,\"name\":\"user_12\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":1012,\"name\":\"user_13\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":1013,\"name\":\"user_14\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":1014,\"name\":\"user_15\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":1015,\"name\":\"user_16\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":1016,\"name\":\"user_17\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":1017,\"name\":\"user_18\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":1018,\"name\":\"user_19\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":1019,\"name\":\"user_20\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ "{\"id\":2000,\"name\":\"user_21\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}",
+ };
+ List rows = fetchRow(iterator, 21);
+ TestCaseUtils.repeatedCheck(
+ () -> Arrays.stream(expected).allMatch(rows.toString()::contains));
+ }
+
+ {
+ LOG.info("Step 1: Create a copy of the target table");
+ executeStatements(
+ String.format(
+ "CREATE TABLE %s_copy LIKE %s;",
+ customers.getTableId(), customers.getTableId()));
+
+ LOG.info("Step 2: Alter the copied table to drop a column");
+ executeStatements(
+ String.format(
+ "ALTER TABLE %s_copy DROP COLUMN phone_number;",
+ customers.getTableId()));
+
+ LOG.info("Step 3: Swap the tables");
+ executeStatements(
+ String.format(
+ "RENAME TABLE %s TO %s_old, %s_copy TO %s;",
+ customers.getTableId(),
+ customers.getTableId(),
+ customers.getTableId(),
+ customers.getTableId()));
+
+ LOG.info("Step 4: Insert data into the altered table before the savepoint");
+ executeStatements(
+ String.format(
+ "INSERT INTO %s VALUES (19213, 'Diana', 'Berlin');",
+ customers.getTableId()));
+
+ List rowsBeforeRestored = fetchRow(iterator, 1);
+ TestCaseUtils.repeatedCheck(
+ () ->
+ rowsBeforeRestored
+ .toString()
+ .contains(
+ "{\"id\":19213,\"name\":\"Diana\",\"address\":\"Berlin\"}"));
+ }
+
+ {
+ LOG.info("Step 5: Take a savepoint");
+ Path savepointDir = Files.createTempDirectory("rename-tables-test");
+ String savepointPath =
+ jobClient
+ .stopWithSavepoint(
+ false,
+ savepointDir.toAbsolutePath().toString(),
+ SavepointFormatType.DEFAULT)
+ .get();
+
+ LOG.info("Step 6: Insert data into the altered table after the savepoint");
+ executeStatements(
+ String.format(
+ "INSERT INTO %s VALUES (19214, 'Diana2', 'Berlin2');",
+ customers.getTableId()));
+
+ LOG.info("Step 7: Restart the job from savepoint");
+ setupSavepoint(restoredEnv, savepointPath);
+ JobClient restoredJobClient = restoredEnv.executeAsync("rename-tables-test");
+ iterator.setJobClient(restoredJobClient);
+ List rowsAfterRestored = fetchRow(iterator, 1);
+ TestCaseUtils.repeatedCheck(
+ () ->
+ rowsAfterRestored
+ .toString()
+ .contains(
+ "{\"id\":19214,\"name\":\"Diana2\",\"address\":\"Berlin2\"}"));
+ restoredJobClient.cancel().get();
+ }
+ }
+
+ private MySqlSourceBuilder getSourceBuilder() {
+ return MySqlSource.builder()
+ .hostname(MYSQL_CONTAINER.getHost())
+ .port(MYSQL_CONTAINER.getDatabasePort())
+ .username(customDatabase.getUsername())
+ .password(customDatabase.getPassword())
+ .databaseList(customDatabase.getDatabaseName())
+ .tableList(customers.getTableId())
+ .deserializer(new JsonDebeziumDeserializationSchema());
+ }
+
+ private MySqlConnection getConnection() {
+ Map properties = new HashMap<>();
+ properties.put("database.hostname", MYSQL_CONTAINER.getHost());
+ properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
+ properties.put("database.user", customDatabase.getUsername());
+ properties.put("database.password", customDatabase.getPassword());
+ io.debezium.config.Configuration configuration =
+ io.debezium.config.Configuration.from(properties);
+ return DebeziumUtils.createMySqlConnection(configuration, new Properties());
+ }
+
+ private void executeStatements(String... statements) throws Exception {
+ connection.execute(statements);
+ connection.commit();
+ }
+
+ private void flushLogs() throws Exception {
+ executeStatements("FLUSH LOGS;");
+ }
+
+ private CollectResultIterator addCollector(
+ StreamExecutionEnvironment env, DataStream stream) {
+ TypeSerializer serializer =
+ stream.getTransformation().getOutputType().createSerializer(env.getConfig());
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectSinkOperatorFactory factory =
+ new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+ CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator();
+ CollectResultIterator iterator =
+ new CollectResultIterator<>(
+ operator.getOperatorIdFuture(),
+ serializer,
+ accumulatorName,
+ env.getCheckpointConfig(),
+ 10000L);
+ CollectStreamSink sink = new CollectStreamSink<>(stream, factory);
+ sink.name("Data stream collect sink");
+ env.addOperator(sink.getTransformation());
+ return iterator;
+ }
+
+ private static List fetchRow(Iterator iter, int size) {
+ List rows = new ArrayList<>(size);
+ while (size > 0 && iter.hasNext()) {
+ String row = iter.next();
+ rows.add(row);
+ size--;
+ }
+ return rows;
+ }
+
+ private static String buildMySqlConfigWithTimezone(File resourceDirectory, String timezone) {
+ try {
+ TemporaryFolder tempFolder = new TemporaryFolder(resourceDirectory);
+ tempFolder.create();
+ File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID()));
+ Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf"));
+ String mysqldConf =
+ "[mysqld]\n"
+ + "binlog_format = row\n"
+ + "log_bin = mysql-bin\n"
+ + "server-id = 223344\n"
+ + "binlog_row_image = FULL\n"
+ + "gtid_mode = on\n"
+ + "enforce_gtid_consistency = on\n";
+ String timezoneConf = "default-time_zone = '" + timezone + "'\n";
+ Files.write(
+ cnf,
+ Collections.singleton(mysqldConf + timezoneConf),
+ StandardCharsets.UTF_8,
+ StandardOpenOption.APPEND);
+ return Paths.get(resourceDirectory.getAbsolutePath()).relativize(cnf).toString();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create my.cnf file.", e);
+ }
+ }
+
+ private static File getResourceFolder() {
+ try {
+ return Paths.get(
+ Objects.requireNonNull(
+ SpecificStartingOffsetITCase.class
+ .getClassLoader()
+ .getResource("."))
+ .toURI())
+ .toFile();
+ } catch (Exception e) {
+ throw new FlinkRuntimeException("Get Resource File Directory fail");
+ }
+ }
+
+ private static String getSystemTimeZone() {
+ return ZoneId.systemDefault().toString();
+ }
+
+ private void setupSavepoint(StreamExecutionEnvironment env, String savepointPath)
+ throws Exception {
+ // restore from savepoint
+ // hack for test to visit protected TestStreamEnvironment#getConfiguration() method
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ Class> clazz =
+ classLoader.loadClass(
+ "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment");
+ Field field = clazz.getDeclaredField("configuration");
+ field.setAccessible(true);
+ Configuration configuration = (Configuration) field.get(env);
+ configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath);
+ }
+
+ private void duplicateTransformations(
+ StreamExecutionEnvironment source, StreamExecutionEnvironment target) {
+ source.getTransformations().forEach(target::addOperator);
+ }
+
+ private StreamExecutionEnvironment getExecutionEnvironment() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.enableCheckpointing(100);
+ env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
+ return env;
+ }
+}