Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-36945] MySQL CDC supports parsing RENAME TABLE Statement with multiple tables #3876

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.ChangeEventCreator;
import io.debezium.pipeline.spi.SchemaChangeEventEmitter;
import io.debezium.relational.TableId;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import io.debezium.schema.DataCollectionFilters;
import io.debezium.schema.DataCollectionId;
import io.debezium.schema.DatabaseSchema;
Expand All @@ -45,10 +47,14 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_FILENAME_OFFSET_KEY;
import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_POSITION_OFFSET_KEY;

Expand Down Expand Up @@ -206,11 +212,103 @@ private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws IOExcepti
String historyStr = DOCUMENT_WRITER.write(historyRecord.document());

Struct value = new Struct(schemaChangeValueSchema);
value.put(HistoryRecord.Fields.SOURCE, event.getSource());
value.put(HistoryRecord.Fields.SOURCE, rewriteTableNameIfNeeded(event));
value.put(HISTORY_RECORD_FIELD, historyStr);
return value;
}

/**
* Rewrites the table name in the Source if needed to handle schema changes properly.
*
* <p>This method addresses a specific issue when renaming multiple tables within a single
* statement, such as: {@code RENAME TABLE customers TO customers_old, customers_copy TO
* customers;}.
*
* <p>In such cases, Debezium's {@link io.debezium.connector.mysql.MySqlDatabaseSchema}
* emits two separate change events:
*
* <ul>
* <li>{@code RENAME TABLE customers TO customers_old}
* <li>{@code RENAME TABLE customers_copy TO customers}
* </ul>
*
* <p>Both events share a table name of {@code customers, customers_old} in their source
* info, which includes multiple table IDs in a single string.
*
* <p>On the other hand, the {@code TableChanges.TableChange#id} correctly identifies the
* schema change:
*
* <ul>
* <li>The change for {@code RENAME TABLE customers_copy TO customers} has the {@code
* customers} ID.
* <li>The change for {@code RENAME TABLE customers TO customers_old} is empty.
* </ul>
*
* <p>The problem arises because {@link
* org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does not expect
* multiple table IDs in the source info. As a result, changes for tables defined by the
* table filter configuration (e.g., {@code customers}) may be filtered out unintentionally.
* This can lead to schema changes not being saved in the state, which is crucial for
* recovering the job from a snapshot.
*
* <p>To resolve this issue, this method:
*
* <ol>
* <li>Checks if the source info contains multiple table names.
* <li>Verifies if the {@code TableChange#id} matches one of the table names.
* <li>Updates the source info with the correct table name that conforms to Flink CDC
* expectations, ensuring the schema change is saved correctly.
* </ol>
*
* @param event the schema change event emitted by Debezium.
* @return the updated source info with the corrected table name if necessary.
*/
private Struct rewriteTableNameIfNeeded(SchemaChangeEvent event) {
Struct sourceInfo = event.getSource();
String tableName = sourceInfo.getString(TABLE_NAME_KEY);
if (tableName == null || tableName.isEmpty()) {
return sourceInfo;
}

List<String> tableNames = parseTableNames(tableName);
if (2 <= tableNames.size() && event.getDdl().toLowerCase().startsWith("rename")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems some other DDL events like DROP TABLE A, B also carry multiple tableIds in their TABLE_NAME_KEY field. Could they be rewritten by this method?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rewrite logic explicitly checks that the DDL starts with "RENAME" using this condition:

if (...event.getDdl().toLowerCase().startsWith("rename"))

So, other DDL events like DROP TABLE won’t be affected by this method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking there might be more DDLs that manipulates multiple tables besides RENAME TABLE, and this method could be extended to rewrite them, too. Maybe it should be done later in another ticket :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That’s a great idea! I agree that the method could be extended to handle other multi-table DDLs in the future. For now, I'll keep the focus on RENAME TABLE.

for (TableChanges.TableChange tableChange : event.getTableChanges()) {
Copy link
Contributor

@yuxiqian yuxiqian Feb 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC the tricky DDL event is RENAME TABLE A TO A_old, A_copy TO A, and Debezium would generate two SchemaChangeEvents, Rename(A => A_old) and Rename(A_COPY => A). Both of them carry A,A_old in the TABLE_NAME_KEY field.

But seems the rewrite logic will not work for the previous event (Rename(A => A_old)) because A_old is excluded from the capturing list, so it will be ignored when constructing SchemaChangeEvent [1], [2], and getTableChanges will return an empty set [3]. Is such behavior intended?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the behavior is intentional. We don't need to handle events related to A_old since it's not part of the capturing list.

To ensure schema changes and data ingestion keep up correctly, it's sufficient to process rename events for tables like A that are in the capturing list. This approach avoids unnecessary processing for tables we're not actively monitoring.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks for clarification!

String changedTableName = getMatchingTableName(tableNames, tableChange.getId());
if (changedTableName != null) {
LOG.debug(
"Rewrite table name from {} to {} on swapping tables",
tableName,
changedTableName);
sourceInfo.put(TABLE_NAME_KEY, changedTableName);
}
}
}
return sourceInfo;
}

/**
* Decodes table names from a comma-separated string.
*
* <p>This method extracts individual table names from a string where multiple table names
* are separated by commas. The input string is constructed by {@link
* io.debezium.connector.mysql.SourceInfo}.
*
* @param tableName a comma-separated string containing multiple table names
* @return a list of trimmed table names
*/
private List<String> parseTableNames(String tableName) {
return Arrays.stream(tableName.split(","))
.map(String::trim)
.collect(Collectors.toList());
}

private String getMatchingTableName(List<String> tableNames, TableId tableId) {
return tableNames.stream()
.filter(name -> name.equals(tableId.table()))
.findFirst()
.orElse(null);
}

@Override
public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException {
historizedSchema.applySchemaChange(event);
Expand Down
Loading
Loading