diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java index 11553ccda5a..6205c9c7ab6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/dispatcher/EventDispatcherImpl.java @@ -29,7 +29,9 @@ import io.debezium.pipeline.source.spi.EventMetadataProvider; import io.debezium.pipeline.spi.ChangeEventCreator; import io.debezium.pipeline.spi.SchemaChangeEventEmitter; +import io.debezium.relational.TableId; import io.debezium.relational.history.HistoryRecord; +import io.debezium.relational.history.TableChanges; import io.debezium.schema.DataCollectionFilters; import io.debezium.schema.DataCollectionId; import io.debezium.schema.DatabaseSchema; @@ -45,10 +47,14 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; +import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY; import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_FILENAME_OFFSET_KEY; import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_POSITION_OFFSET_KEY; @@ -206,11 +212,103 @@ private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws IOExcepti String historyStr = DOCUMENT_WRITER.write(historyRecord.document()); Struct value = new Struct(schemaChangeValueSchema); - value.put(HistoryRecord.Fields.SOURCE, event.getSource()); + value.put(HistoryRecord.Fields.SOURCE, rewriteTableNameIfNeeded(event)); value.put(HISTORY_RECORD_FIELD, historyStr); return value; } + /** + * Rewrites the table name in the Source if needed to handle schema changes properly. + * + *

This method addresses a specific issue when renaming multiple tables within a single + * statement, such as: {@code RENAME TABLE customers TO customers_old, customers_copy TO + * customers;}. + * + *

In such cases, Debezium's {@link io.debezium.connector.mysql.MySqlDatabaseSchema} + * emits two separate change events: + * + *

+ * + *

Both events share a table name of {@code customers, customers_old} in their source + * info, which includes multiple table IDs in a single string. + * + *

On the other hand, the {@code TableChanges.TableChange#id} correctly identifies the + * schema change: + * + *

+ * + *

The problem arises because {@link + * org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does not expect + * multiple table IDs in the source info. As a result, changes for tables defined by the + * table filter configuration (e.g., {@code customers}) may be filtered out unintentionally. + * This can lead to schema changes not being saved in the state, which is crucial for + * recovering the job from a snapshot. + * + *

To resolve this issue, this method: + * + *

    + *
  1. Checks if the source info contains multiple table names. + *
  2. Verifies if the {@code TableChange#id} matches one of the table names. + *
  3. Updates the source info with the correct table name that conforms to Flink CDC + * expectations, ensuring the schema change is saved correctly. + *
+ * + * @param event the schema change event emitted by Debezium. + * @return the updated source info with the corrected table name if necessary. + */ + private Struct rewriteTableNameIfNeeded(SchemaChangeEvent event) { + Struct sourceInfo = event.getSource(); + String tableName = sourceInfo.getString(TABLE_NAME_KEY); + if (tableName == null || tableName.isEmpty()) { + return sourceInfo; + } + + List tableNames = parseTableNames(tableName); + if (2 <= tableNames.size() && event.getDdl().toLowerCase().startsWith("rename")) { + for (TableChanges.TableChange tableChange : event.getTableChanges()) { + String changedTableName = getMatchingTableName(tableNames, tableChange.getId()); + if (changedTableName != null) { + LOG.debug( + "Rewrite table name from {} to {} on swapping tables", + tableName, + changedTableName); + sourceInfo.put(TABLE_NAME_KEY, changedTableName); + } + } + } + return sourceInfo; + } + + /** + * Decodes table names from a comma-separated string. + * + *

This method extracts individual table names from a string where multiple table names + * are separated by commas. The input string is constructed by {@link + * io.debezium.connector.mysql.SourceInfo}. + * + * @param tableName a comma-separated string containing multiple table names + * @return a list of trimmed table names + */ + private List parseTableNames(String tableName) { + return Arrays.stream(tableName.split(",")) + .map(String::trim) + .collect(Collectors.toList()); + } + + private String getMatchingTableName(List tableNames, TableId tableId) { + return tableNames.stream() + .filter(name -> name.equals(tableId.table())) + .findFirst() + .orElse(null); + } + @Override public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException { historizedSchema.applySchemaChange(event); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleTablesRenamingITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleTablesRenamingITCase.java new file mode 100644 index 00000000000..e8f4ac8c24a --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlMultipleTablesRenamingITCase.java @@ -0,0 +1,395 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.cdc.common.utils.TestCaseUtils; +import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils; +import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer; +import org.apache.flink.cdc.connectors.mysql.testutils.TestTable; +import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas; +import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; +import org.apache.flink.cdc.debezium.JsonDebeziumDeserializationSchema; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.core.execution.SavepointFormatType; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.FlinkRuntimeException; + +import io.debezium.connector.mysql.MySqlConnection; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.output.Slf4jLogConsumer; + +import java.io.File; +import java.lang.reflect.Field; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.UUID; + +/** + * Integration tests for handling schema changes regard to renaming multiple tables within a single + * statement. + */ +public class MySqlMultipleTablesRenamingITCase { + private static final Logger LOG = + LoggerFactory.getLogger(MySqlMultipleTablesRenamingITCase.class); + @RegisterExtension static MiniClusterExtension miniCluster = new MiniClusterExtension(); + + @SuppressWarnings("unchecked") + private static final MySqlContainer MYSQL_CONTAINER = + (MySqlContainer) + new MySqlContainer() + .withConfigurationOverride( + buildMySqlConfigWithTimezone( + getResourceFolder(), getSystemTimeZone())) + .withSetupSQL("docker/setup.sql") + .withDatabaseName("flink-test") + .withUsername("flinkuser") + .withPassword("flinkpw") + .withLogConsumer(new Slf4jLogConsumer(LOG)); + + private final UniqueDatabase customDatabase = + new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); + private final TestTable customers = + new TestTable(customDatabase, "customers", TestTableSchemas.CUSTOMERS); + + private MySqlConnection connection; + + @BeforeAll + public static void before() throws Exception { + MYSQL_CONTAINER.start(); + } + + @AfterAll + public static void after() throws Exception { + MYSQL_CONTAINER.stop(); + } + + @BeforeEach + void prepare() throws Exception { + connection = getConnection(); + customDatabase.createAndInitialize(); + flushLogs(); + } + + @AfterEach + void tearDown() throws Exception { + customDatabase.dropDatabase(); + connection.close(); + } + + /** + * Tests handling of renaming multiple tables within a single SQL statement in a Flink CDC job. + * + *

This integration test validates that schema changes involving multiple table renames, such + * as {@code RENAME TABLE table1 TO table1_old, table2 TO table1}, are correctly processed + * without data loss or inconsistency. + * + *

The test covers: + * + *

+ * + *

This ensures that the connector can accurately process and persist schema changes when + * tables are swapped, addressing potential issues with table filtering or mismatched table IDs + * during schema updates. + */ + @Test + void testRenameTablesWithinSingleStatement() throws Exception { + // Build Flink job + StreamExecutionEnvironment env = getExecutionEnvironment(); + MySqlSource source = getSourceBuilder().build(); + DataStreamSource stream = + env.fromSource(source, WatermarkStrategy.noWatermarks(), "rename-tables-test"); + CollectResultIterator iterator = addCollector(env, stream); + + // Copy transformations into another env + StreamExecutionEnvironment restoredEnv = getExecutionEnvironment(); + duplicateTransformations(env, restoredEnv); + + // Execute job and validate results + JobClient jobClient = env.executeAsync(); + iterator.setJobClient(jobClient); + + { + String[] expected = + new String[] { + "{\"id\":101,\"name\":\"user_1\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":102,\"name\":\"user_2\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":103,\"name\":\"user_3\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":109,\"name\":\"user_4\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":110,\"name\":\"user_5\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":111,\"name\":\"user_6\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":118,\"name\":\"user_7\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":121,\"name\":\"user_8\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":123,\"name\":\"user_9\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1009,\"name\":\"user_10\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1010,\"name\":\"user_11\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1011,\"name\":\"user_12\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1012,\"name\":\"user_13\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1013,\"name\":\"user_14\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1014,\"name\":\"user_15\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1015,\"name\":\"user_16\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1016,\"name\":\"user_17\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1017,\"name\":\"user_18\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1018,\"name\":\"user_19\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":1019,\"name\":\"user_20\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + "{\"id\":2000,\"name\":\"user_21\",\"address\":\"Shanghai\",\"phone_number\":\"123567891234\"}", + }; + List rows = fetchRow(iterator, 21); + TestCaseUtils.repeatedCheck( + () -> Arrays.stream(expected).allMatch(rows.toString()::contains)); + } + + { + LOG.info("Step 1: Create a copy of the target table"); + executeStatements( + String.format( + "CREATE TABLE %s_copy LIKE %s;", + customers.getTableId(), customers.getTableId())); + + LOG.info("Step 2: Alter the copied table to drop a column"); + executeStatements( + String.format( + "ALTER TABLE %s_copy DROP COLUMN phone_number;", + customers.getTableId())); + + LOG.info("Step 3: Swap the tables"); + executeStatements( + String.format( + "RENAME TABLE %s TO %s_old, %s_copy TO %s;", + customers.getTableId(), + customers.getTableId(), + customers.getTableId(), + customers.getTableId())); + + LOG.info("Step 4: Insert data into the altered table before the savepoint"); + executeStatements( + String.format( + "INSERT INTO %s VALUES (19213, 'Diana', 'Berlin');", + customers.getTableId())); + + List rowsBeforeRestored = fetchRow(iterator, 1); + TestCaseUtils.repeatedCheck( + () -> + rowsBeforeRestored + .toString() + .contains( + "{\"id\":19213,\"name\":\"Diana\",\"address\":\"Berlin\"}")); + } + + { + LOG.info("Step 5: Take a savepoint"); + Path savepointDir = Files.createTempDirectory("rename-tables-test"); + String savepointPath = + jobClient + .stopWithSavepoint( + false, + savepointDir.toAbsolutePath().toString(), + SavepointFormatType.DEFAULT) + .get(); + + LOG.info("Step 6: Insert data into the altered table after the savepoint"); + executeStatements( + String.format( + "INSERT INTO %s VALUES (19214, 'Diana2', 'Berlin2');", + customers.getTableId())); + + LOG.info("Step 7: Restart the job from savepoint"); + setupSavepoint(restoredEnv, savepointPath); + JobClient restoredJobClient = restoredEnv.executeAsync("rename-tables-test"); + iterator.setJobClient(restoredJobClient); + List rowsAfterRestored = fetchRow(iterator, 1); + TestCaseUtils.repeatedCheck( + () -> + rowsAfterRestored + .toString() + .contains( + "{\"id\":19214,\"name\":\"Diana2\",\"address\":\"Berlin2\"}")); + restoredJobClient.cancel().get(); + } + } + + private MySqlSourceBuilder getSourceBuilder() { + return MySqlSource.builder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .username(customDatabase.getUsername()) + .password(customDatabase.getPassword()) + .databaseList(customDatabase.getDatabaseName()) + .tableList(customers.getTableId()) + .deserializer(new JsonDebeziumDeserializationSchema()); + } + + private MySqlConnection getConnection() { + Map properties = new HashMap<>(); + properties.put("database.hostname", MYSQL_CONTAINER.getHost()); + properties.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + properties.put("database.user", customDatabase.getUsername()); + properties.put("database.password", customDatabase.getPassword()); + io.debezium.config.Configuration configuration = + io.debezium.config.Configuration.from(properties); + return DebeziumUtils.createMySqlConnection(configuration, new Properties()); + } + + private void executeStatements(String... statements) throws Exception { + connection.execute(statements); + connection.commit(); + } + + private void flushLogs() throws Exception { + executeStatements("FLUSH LOGS;"); + } + + private CollectResultIterator addCollector( + StreamExecutionEnvironment env, DataStream stream) { + TypeSerializer serializer = + stream.getTransformation().getOutputType().createSerializer(env.getConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory factory = + new CollectSinkOperatorFactory<>(serializer, accumulatorName); + CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator(); + CollectResultIterator iterator = + new CollectResultIterator<>( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + env.getCheckpointConfig(), + 10000L); + CollectStreamSink sink = new CollectStreamSink<>(stream, factory); + sink.name("Data stream collect sink"); + env.addOperator(sink.getTransformation()); + return iterator; + } + + private static List fetchRow(Iterator iter, int size) { + List rows = new ArrayList<>(size); + while (size > 0 && iter.hasNext()) { + String row = iter.next(); + rows.add(row); + size--; + } + return rows; + } + + private static String buildMySqlConfigWithTimezone(File resourceDirectory, String timezone) { + try { + TemporaryFolder tempFolder = new TemporaryFolder(resourceDirectory); + tempFolder.create(); + File folder = tempFolder.newFolder(String.valueOf(UUID.randomUUID())); + Path cnf = Files.createFile(Paths.get(folder.getPath(), "my.cnf")); + String mysqldConf = + "[mysqld]\n" + + "binlog_format = row\n" + + "log_bin = mysql-bin\n" + + "server-id = 223344\n" + + "binlog_row_image = FULL\n" + + "gtid_mode = on\n" + + "enforce_gtid_consistency = on\n"; + String timezoneConf = "default-time_zone = '" + timezone + "'\n"; + Files.write( + cnf, + Collections.singleton(mysqldConf + timezoneConf), + StandardCharsets.UTF_8, + StandardOpenOption.APPEND); + return Paths.get(resourceDirectory.getAbsolutePath()).relativize(cnf).toString(); + } catch (Exception e) { + throw new RuntimeException("Failed to create my.cnf file.", e); + } + } + + private static File getResourceFolder() { + try { + return Paths.get( + Objects.requireNonNull( + SpecificStartingOffsetITCase.class + .getClassLoader() + .getResource(".")) + .toURI()) + .toFile(); + } catch (Exception e) { + throw new FlinkRuntimeException("Get Resource File Directory fail"); + } + } + + private static String getSystemTimeZone() { + return ZoneId.systemDefault().toString(); + } + + private void setupSavepoint(StreamExecutionEnvironment env, String savepointPath) + throws Exception { + // restore from savepoint + // hack for test to visit protected TestStreamEnvironment#getConfiguration() method + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + Class clazz = + classLoader.loadClass( + "org.apache.flink.streaming.api.environment.StreamExecutionEnvironment"); + Field field = clazz.getDeclaredField("configuration"); + field.setAccessible(true); + Configuration configuration = (Configuration) field.get(env); + configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, savepointPath); + } + + private void duplicateTransformations( + StreamExecutionEnvironment source, StreamExecutionEnvironment target) { + source.getTransformations().forEach(target::addOperator); + } + + private StreamExecutionEnvironment getExecutionEnvironment() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100); + env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + return env; + } +}