-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-36945] MySQL CDC supports parsing RENAME TABLE Statement with multiple tables #3876
base: master
Are you sure you want to change the base?
Changes from all commits
0e02ddf
403c670
4fb3b7d
a29d93b
81ae25b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,7 +29,9 @@ | |
import io.debezium.pipeline.source.spi.EventMetadataProvider; | ||
import io.debezium.pipeline.spi.ChangeEventCreator; | ||
import io.debezium.pipeline.spi.SchemaChangeEventEmitter; | ||
import io.debezium.relational.TableId; | ||
import io.debezium.relational.history.HistoryRecord; | ||
import io.debezium.relational.history.TableChanges; | ||
import io.debezium.schema.DataCollectionFilters; | ||
import io.debezium.schema.DataCollectionId; | ||
import io.debezium.schema.DatabaseSchema; | ||
|
@@ -45,10 +47,14 @@ | |
import org.slf4j.LoggerFactory; | ||
|
||
import java.io.IOException; | ||
import java.util.Arrays; | ||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY; | ||
import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_FILENAME_OFFSET_KEY; | ||
import static org.apache.flink.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher.BINLOG_POSITION_OFFSET_KEY; | ||
|
||
|
@@ -206,11 +212,103 @@ private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws IOExcepti | |
String historyStr = DOCUMENT_WRITER.write(historyRecord.document()); | ||
|
||
Struct value = new Struct(schemaChangeValueSchema); | ||
value.put(HistoryRecord.Fields.SOURCE, event.getSource()); | ||
value.put(HistoryRecord.Fields.SOURCE, rewriteTableNameIfNeeded(event)); | ||
value.put(HISTORY_RECORD_FIELD, historyStr); | ||
return value; | ||
} | ||
|
||
/** | ||
* Rewrites the table name in the Source if needed to handle schema changes properly. | ||
* | ||
* <p>This method addresses a specific issue when renaming multiple tables within a single | ||
* statement, such as: {@code RENAME TABLE customers TO customers_old, customers_copy TO | ||
* customers;}. | ||
* | ||
* <p>In such cases, Debezium's {@link io.debezium.connector.mysql.MySqlDatabaseSchema} | ||
* emits two separate change events: | ||
* | ||
* <ul> | ||
* <li>{@code RENAME TABLE customers TO customers_old} | ||
* <li>{@code RENAME TABLE customers_copy TO customers} | ||
* </ul> | ||
* | ||
* <p>Both events share a table name of {@code customers, customers_old} in their source | ||
* info, which includes multiple table IDs in a single string. | ||
* | ||
* <p>On the other hand, the {@code TableChanges.TableChange#id} correctly identifies the | ||
* schema change: | ||
* | ||
* <ul> | ||
* <li>The change for {@code RENAME TABLE customers_copy TO customers} has the {@code | ||
* customers} ID. | ||
* <li>The change for {@code RENAME TABLE customers TO customers_old} is empty. | ||
* </ul> | ||
* | ||
* <p>The problem arises because {@link | ||
* org.apache.flink.cdc.connectors.mysql.debezium.reader.BinlogSplitReader} does not expect | ||
* multiple table IDs in the source info. As a result, changes for tables defined by the | ||
* table filter configuration (e.g., {@code customers}) may be filtered out unintentionally. | ||
* This can lead to schema changes not being saved in the state, which is crucial for | ||
* recovering the job from a snapshot. | ||
* | ||
* <p>To resolve this issue, this method: | ||
* | ||
* <ol> | ||
* <li>Checks if the source info contains multiple table names. | ||
* <li>Verifies if the {@code TableChange#id} matches one of the table names. | ||
* <li>Updates the source info with the correct table name that conforms to Flink CDC | ||
* expectations, ensuring the schema change is saved correctly. | ||
* </ol> | ||
* | ||
* @param event the schema change event emitted by Debezium. | ||
* @return the updated source info with the corrected table name if necessary. | ||
*/ | ||
private Struct rewriteTableNameIfNeeded(SchemaChangeEvent event) { | ||
Struct sourceInfo = event.getSource(); | ||
String tableName = sourceInfo.getString(TABLE_NAME_KEY); | ||
if (tableName == null || tableName.isEmpty()) { | ||
return sourceInfo; | ||
} | ||
|
||
List<String> tableNames = parseTableNames(tableName); | ||
if (2 <= tableNames.size() && event.getDdl().toLowerCase().startsWith("rename")) { | ||
for (TableChanges.TableChange tableChange : event.getTableChanges()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC the tricky DDL event is But seems the rewrite logic will not work for the previous event ( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the behavior is intentional. We don't need to handle events related to To ensure schema changes and data ingestion keep up correctly, it's sufficient to process rename events for tables like There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. Thanks for clarification! |
||
String changedTableName = getMatchingTableName(tableNames, tableChange.getId()); | ||
if (changedTableName != null) { | ||
LOG.debug( | ||
"Rewrite table name from {} to {} on swapping tables", | ||
tableName, | ||
changedTableName); | ||
sourceInfo.put(TABLE_NAME_KEY, changedTableName); | ||
} | ||
} | ||
} | ||
return sourceInfo; | ||
} | ||
|
||
/** | ||
* Decodes table names from a comma-separated string. | ||
* | ||
* <p>This method extracts individual table names from a string where multiple table names | ||
* are separated by commas. The input string is constructed by {@link | ||
* io.debezium.connector.mysql.SourceInfo}. | ||
* | ||
* @param tableName a comma-separated string containing multiple table names | ||
* @return a list of trimmed table names | ||
*/ | ||
private List<String> parseTableNames(String tableName) { | ||
return Arrays.stream(tableName.split(",")) | ||
.map(String::trim) | ||
.collect(Collectors.toList()); | ||
} | ||
|
||
private String getMatchingTableName(List<String> tableNames, TableId tableId) { | ||
return tableNames.stream() | ||
.filter(name -> name.equals(tableId.table())) | ||
.findFirst() | ||
.orElse(null); | ||
} | ||
|
||
@Override | ||
public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException { | ||
historizedSchema.applySchemaChange(event); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems some other DDL events like
DROP TABLE A, B
also carry multiple tableIds in theirTABLE_NAME_KEY
field. Could they be rewritten by this method?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rewrite logic explicitly checks that the DDL starts with
"RENAME"
using this condition:So, other DDL events like
DROP TABLE
won’t be affected by this method.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking there might be more DDLs that manipulates multiple tables besides
RENAME TABLE
, and this method could be extended to rewrite them, too. Maybe it should be done later in another ticket :)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That’s a great idea! I agree that the method could be extended to handle other multi-table DDLs in the future. For now, I'll keep the focus on
RENAME TABLE
.