From eafe094bbdc57aebaf005bb169c0d64a1ff72a59 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 13 Sep 2024 11:33:51 +0800 Subject: [PATCH 1/3] init Signed-off-by: qupeng --- cdc/model/sink.go | 11 ++++++ cdc/sink/dmlsink/txn/event.go | 12 ++++--- cdc/sink/dmlsink/txn/mysql/dml.go | 56 ++++++++++++++++------------- cdc/sink/dmlsink/txn/mysql/mysql.go | 10 +++--- 4 files changed, 55 insertions(+), 34 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index d3e0d525bf7..825768c6fb8 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1292,3 +1292,14 @@ type TopicPartitionKey struct { PartitionKey string TotalPartition int32 } + +// GetColumnFlag returns column flag. +func GetColumnFlag(column *ColumnData, tb *TableInfo) ColumnFlagType { + return *tb.ColumnsFlag[column.ColumnID] +} + +// GetColumnInfo returns column info. +func GetColumnInfo(column *ColumnData, tb *TableInfo) *model.ColumnInfo { + offset := tb.columnsOffset[column.ColumnID] + return tb.Columns[offset] +} diff --git a/cdc/sink/dmlsink/txn/event.go b/cdc/sink/dmlsink/txn/event.go index 1632bce8823..532556394ad 100644 --- a/cdc/sink/dmlsink/txn/event.go +++ b/cdc/sink/dmlsink/txn/event.go @@ -73,7 +73,7 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { var keys [][]byte if len(row.Columns) != 0 { for iIdx, idxCol := range row.TableInfo.IndexColumnsOffset { - key := genKeyList(row.GetColumns(), iIdx, idxCol, row.GetTableID()) + key := genKeyList(row.Columns, row.TableInfo, iIdx, idxCol, row.GetTableID()) if len(key) == 0 { continue } @@ -82,7 +82,7 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { } if len(row.PreColumns) != 0 { for iIdx, idxCol := range row.TableInfo.IndexColumnsOffset { - key := genKeyList(row.GetPreColumns(), iIdx, idxCol, row.GetTableID()) + key := genKeyList(row.PreColumns, row.TableInfo, iIdx, idxCol, row.GetTableID()) if len(key) == 0 { continue } @@ -101,19 +101,21 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte { } func genKeyList( - columns []*model.Column, iIdx int, colIdx []int, tableID int64, + columns []*model.ColumnData, tb *model.TableInfo, iIdx int, colIdx []int, tableID int64, ) []byte { var key []byte for _, i := range colIdx { + colInfo := model.GetColumnInfo(columns[i], tb) + colFlag := model.GetColumnFlag(columns[i], tb) // if a column value is null, we can ignore this index // If the index contain generated column, we can't use this key to detect conflict with other DML, // Because such as insert can't specify the generated value. - if columns[i] == nil || columns[i].Value == nil || columns[i].Flag.IsGeneratedColumn() { + if columns[i] == nil || columns[i].Value == nil || colFlag.IsGeneratedColumn() { return nil } val := model.ColumnValueString(columns[i].Value) - if columnNeeds2LowerCase(columns[i].Type, columns[i].Collation) { + if columnNeeds2LowerCase(colInfo.GetType(), colInfo.GetCollate()) { val = strings.ToLower(val) } diff --git a/cdc/sink/dmlsink/txn/mysql/dml.go b/cdc/sink/dmlsink/txn/mysql/dml.go index da29618908f..115acf7ce7a 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml.go +++ b/cdc/sink/dmlsink/txn/mysql/dml.go @@ -17,6 +17,7 @@ import ( "strings" "github.com/pingcap/tidb/pkg/parser/charset" + pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/quotes" ) @@ -24,18 +25,20 @@ import ( // prepareUpdate builds a parametrics UPDATE statement as following // sql: `UPDATE `test`.`t` SET {} = ?, {} = ? WHERE {} = ?, {} = {} LIMIT 1` // `WHERE` conditions come from `preCols` and SET clause targets come from `cols`. -func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplicate bool) (string, []interface{}) { +func prepareUpdate(quoteTable string, preCols, cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (string, []interface{}) { var builder strings.Builder builder.WriteString("UPDATE " + quoteTable + " SET ") columnNames := make([]string, 0, len(cols)) args := make([]interface{}, 0, len(cols)+len(preCols)) for _, col := range cols { - if col == nil || col.Flag.IsGeneratedColumn() { + colInfo := model.GetColumnInfo(col, tb) + colFlag := model.GetColumnFlag(col, tb) + if col == nil || colFlag.IsGeneratedColumn() { continue } - columnNames = append(columnNames, col.Name) - args = appendQueryArgs(args, col) + columnNames = append(columnNames, colInfo.Name.O) + args = appendQueryArgs(args, col, colInfo) } if len(args) == 0 { return "", nil @@ -49,7 +52,7 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic } builder.WriteString(" WHERE ") - colNames, wargs := whereSlice(preCols, forceReplicate) + colNames, wargs := whereSlice(preCols, tb, forceReplicate) if len(wargs) == 0 { return "", nil } @@ -73,7 +76,8 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic // sql: `REPLACE INTO `test`.`t` VALUES (?,?,?)` func prepareReplace( quoteTable string, - cols []*model.Column, + cols []*model.ColumnData, + tb *model.TableInfo, appendPlaceHolder bool, translateToInsert bool, ) (string, []interface{}) { @@ -81,11 +85,13 @@ func prepareReplace( columnNames := make([]string, 0, len(cols)) args := make([]interface{}, 0, len(cols)) for _, col := range cols { - if col == nil || col.Flag.IsGeneratedColumn() { + colInfo := model.GetColumnInfo(col, tb) + colFlag := model.GetColumnFlag(col, tb) + if col == nil || colFlag.IsGeneratedColumn() { continue } - columnNames = append(columnNames, col.Name) - args = appendQueryArgs(args, col) + columnNames = append(columnNames, colInfo.Name.O) + args = appendQueryArgs(args, col, colInfo) } if len(args) == 0 { return "", nil @@ -108,8 +114,9 @@ func prepareReplace( // representation. Because if we use the byte array respresentation, the go-sql-driver // will automatically set `_binary` charset for that column, which is not expected. // See https://github.com/go-sql-driver/mysql/blob/ce134bfc/connection.go#L267 -func appendQueryArgs(args []interface{}, col *model.Column) []interface{} { - if col.Charset != "" && col.Charset != charset.CharsetBin { +func appendQueryArgs(args []interface{}, col *model.ColumnData, colInfo *pmodel.ColumnInfo) []interface{} { + cst := colInfo.GetCharset() + if cst != "" && cst != charset.CharsetBin { colValBytes, ok := col.Value.([]byte) if ok { args = append(args, string(colValBytes)) @@ -125,11 +132,11 @@ func appendQueryArgs(args []interface{}, col *model.Column) []interface{} { // prepareDelete builds a parametric DELETE statement as following // sql: `DELETE FROM `test`.`t` WHERE x = ? AND y >= ? LIMIT 1` -func prepareDelete(quoteTable string, cols []*model.Column, forceReplicate bool) (string, []interface{}) { +func prepareDelete(quoteTable string, cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (string, []interface{}) { var builder strings.Builder builder.WriteString("DELETE FROM " + quoteTable + " WHERE ") - colNames, wargs := whereSlice(cols, forceReplicate) + colNames, wargs := whereSlice(cols, tb, forceReplicate) if len(wargs) == 0 { return "", nil } @@ -152,23 +159,22 @@ func prepareDelete(quoteTable string, cols []*model.Column, forceReplicate bool) // whereSlice builds a parametric WHERE clause as following // sql: `WHERE {} = ? AND {} > ?` -func whereSlice(cols []*model.Column, forceReplicate bool) (colNames []string, args []interface{}) { - // Try to use unique key values when available - for _, col := range cols { - if col == nil || !col.Flag.IsHandleKey() { - continue - } - colNames = append(colNames, col.Name) - args = appendQueryArgs(args, col) - } +func whereSlice(cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (colNames []string, args []interface{}) { // if no explicit row id but force replicate, use all key-values in where condition if len(colNames) == 0 && forceReplicate { colNames = make([]string, 0, len(cols)) args = make([]interface{}, 0, len(cols)) - for _, col := range cols { - colNames = append(colNames, col.Name) - args = appendQueryArgs(args, col) + } + + // Try to use unique key values when available + for _, col := range cols { + colFlag := model.GetColumnFlag(col, tb) + colInfo := model.GetColumnInfo(col, tb) + if col == nil || !colFlag.IsHandleKey() { + continue } + colNames = append(colNames, colInfo.Name.O) + args = appendQueryArgs(args, col, colInfo) } return } diff --git a/cdc/sink/dmlsink/txn/mysql/mysql.go b/cdc/sink/dmlsink/txn/mysql/mysql.go index 27fddeeafa7..710f87ba09f 100644 --- a/cdc/sink/dmlsink/txn/mysql/mysql.go +++ b/cdc/sink/dmlsink/txn/mysql/mysql.go @@ -571,8 +571,9 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { if len(row.PreColumns) != 0 && len(row.Columns) != 0 { query, args = prepareUpdate( quoteTable, - row.GetPreColumns(), - row.GetColumns(), + row.PreColumns, + row.Columns, + row.TableInfo, s.cfg.ForceReplicate) if query != "" { sqls = append(sqls, query) @@ -584,7 +585,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { // Delete Event if len(row.PreColumns) != 0 { - query, args = prepareDelete(quoteTable, row.GetPreColumns(), s.cfg.ForceReplicate) + query, args = prepareDelete(quoteTable, row.PreColumns, row.TableInfo, s.cfg.ForceReplicate) if query != "" { sqls = append(sqls, query) values = append(values, args) @@ -598,7 +599,8 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs { if len(row.Columns) != 0 { query, args = prepareReplace( quoteTable, - row.GetColumns(), + row.Columns, + row.TableInfo, true, /* appendPlaceHolder */ translateToInsert) if query != "" { From edf2f2c99725a78ebe76d984e1d1f79c7a4821ea Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 13 Sep 2024 12:09:58 +0800 Subject: [PATCH 2/3] fix Signed-off-by: qupeng --- cdc/model/sink.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cdc/model/sink.go b/cdc/model/sink.go index 825768c6fb8..41a81a0e6f5 100644 --- a/cdc/model/sink.go +++ b/cdc/model/sink.go @@ -1294,8 +1294,8 @@ type TopicPartitionKey struct { } // GetColumnFlag returns column flag. -func GetColumnFlag(column *ColumnData, tb *TableInfo) ColumnFlagType { - return *tb.ColumnsFlag[column.ColumnID] +func GetColumnFlag(column *ColumnData, tb *TableInfo) *ColumnFlagType { + return tb.ColumnsFlag[column.ColumnID] } // GetColumnInfo returns column info. From 3f7854d8036742c21b13054acb2ae24629287656 Mon Sep 17 00:00:00 2001 From: qupeng Date: Fri, 13 Sep 2024 12:19:05 +0800 Subject: [PATCH 3/3] more changes Signed-off-by: qupeng --- cdc/sink/dmlsink/txn/mysql/dml.go | 32 ++++++++++++++++++++----------- 1 file changed, 21 insertions(+), 11 deletions(-) diff --git a/cdc/sink/dmlsink/txn/mysql/dml.go b/cdc/sink/dmlsink/txn/mysql/dml.go index 115acf7ce7a..cb91912756f 100644 --- a/cdc/sink/dmlsink/txn/mysql/dml.go +++ b/cdc/sink/dmlsink/txn/mysql/dml.go @@ -160,21 +160,31 @@ func prepareDelete(quoteTable string, cols []*model.ColumnData, tb *model.TableI // whereSlice builds a parametric WHERE clause as following // sql: `WHERE {} = ? AND {} > ?` func whereSlice(cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (colNames []string, args []interface{}) { - // if no explicit row id but force replicate, use all key-values in where condition + // If no explicit row id but force replicate, use all key-values in where condition. if len(colNames) == 0 && forceReplicate { colNames = make([]string, 0, len(cols)) args = make([]interface{}, 0, len(cols)) - } - - // Try to use unique key values when available - for _, col := range cols { - colFlag := model.GetColumnFlag(col, tb) - colInfo := model.GetColumnInfo(col, tb) - if col == nil || !colFlag.IsHandleKey() { - continue + for _, col := range cols { + if col == nil { + continue + } + colInfo := model.GetColumnInfo(col, tb) + colNames = append(colNames, colInfo.Name.O) + args = appendQueryArgs(args, col, colInfo) + } + } else { // Try to use unique key values when available. + for _, col := range cols { + if col == nil { + continue + } + colFlag := model.GetColumnFlag(col, tb) + colInfo := model.GetColumnInfo(col, tb) + if !colFlag.IsHandleKey() { + continue + } + colNames = append(colNames, colInfo.Name.O) + args = appendQueryArgs(args, col, colInfo) } - colNames = append(colNames, colInfo.Name.O) - args = appendQueryArgs(args, col, colInfo) } return }