Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sink: avoid memory allocations for transforming ColumnData to Column #11595

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,3 +1292,14 @@ type TopicPartitionKey struct {
PartitionKey string
TotalPartition int32
}

// GetColumnFlag returns column flag.
func GetColumnFlag(column *ColumnData, tb *TableInfo) *ColumnFlagType {
return tb.ColumnsFlag[column.ColumnID]
}

// GetColumnInfo returns column info.
func GetColumnInfo(column *ColumnData, tb *TableInfo) *model.ColumnInfo {
offset := tb.columnsOffset[column.ColumnID]
return tb.Columns[offset]
}
12 changes: 7 additions & 5 deletions cdc/sink/dmlsink/txn/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte {
var keys [][]byte
if len(row.Columns) != 0 {
for iIdx, idxCol := range row.TableInfo.IndexColumnsOffset {
key := genKeyList(row.GetColumns(), iIdx, idxCol, row.GetTableID())
key := genKeyList(row.Columns, row.TableInfo, iIdx, idxCol, row.GetTableID())
if len(key) == 0 {
continue
}
Expand All @@ -82,7 +82,7 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte {
}
if len(row.PreColumns) != 0 {
for iIdx, idxCol := range row.TableInfo.IndexColumnsOffset {
key := genKeyList(row.GetPreColumns(), iIdx, idxCol, row.GetTableID())
key := genKeyList(row.PreColumns, row.TableInfo, iIdx, idxCol, row.GetTableID())
if len(key) == 0 {
continue
}
Expand All @@ -101,19 +101,21 @@ func genRowKeys(row *model.RowChangedEvent) [][]byte {
}

func genKeyList(
columns []*model.Column, iIdx int, colIdx []int, tableID int64,
columns []*model.ColumnData, tb *model.TableInfo, iIdx int, colIdx []int, tableID int64,
) []byte {
var key []byte
for _, i := range colIdx {
colInfo := model.GetColumnInfo(columns[i], tb)
colFlag := model.GetColumnFlag(columns[i], tb)
// if a column value is null, we can ignore this index
// If the index contain generated column, we can't use this key to detect conflict with other DML,
// Because such as insert can't specify the generated value.
if columns[i] == nil || columns[i].Value == nil || columns[i].Flag.IsGeneratedColumn() {
if columns[i] == nil || columns[i].Value == nil || colFlag.IsGeneratedColumn() {
return nil
}

val := model.ColumnValueString(columns[i].Value)
if columnNeeds2LowerCase(columns[i].Type, columns[i].Collation) {
if columnNeeds2LowerCase(colInfo.GetType(), colInfo.GetCollate()) {
val = strings.ToLower(val)
}

Expand Down
66 changes: 41 additions & 25 deletions cdc/sink/dmlsink/txn/mysql/dml.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,28 @@ import (
"strings"

"github.com/pingcap/tidb/pkg/parser/charset"
pmodel "github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/quotes"
)

// prepareUpdate builds a parametrics UPDATE statement as following
// sql: `UPDATE `test`.`t` SET {} = ?, {} = ? WHERE {} = ?, {} = {} LIMIT 1`
// `WHERE` conditions come from `preCols` and SET clause targets come from `cols`.
func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplicate bool) (string, []interface{}) {
func prepareUpdate(quoteTable string, preCols, cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (string, []interface{}) {
var builder strings.Builder
builder.WriteString("UPDATE " + quoteTable + " SET ")

columnNames := make([]string, 0, len(cols))
args := make([]interface{}, 0, len(cols)+len(preCols))
for _, col := range cols {
if col == nil || col.Flag.IsGeneratedColumn() {
colInfo := model.GetColumnInfo(col, tb)
colFlag := model.GetColumnFlag(col, tb)
if col == nil || colFlag.IsGeneratedColumn() {
continue
}
columnNames = append(columnNames, col.Name)
args = appendQueryArgs(args, col)
columnNames = append(columnNames, colInfo.Name.O)
args = appendQueryArgs(args, col, colInfo)
}
if len(args) == 0 {
return "", nil
Expand All @@ -49,7 +52,7 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic
}

builder.WriteString(" WHERE ")
colNames, wargs := whereSlice(preCols, forceReplicate)
colNames, wargs := whereSlice(preCols, tb, forceReplicate)
if len(wargs) == 0 {
return "", nil
}
Expand All @@ -73,19 +76,22 @@ func prepareUpdate(quoteTable string, preCols, cols []*model.Column, forceReplic
// sql: `REPLACE INTO `test`.`t` VALUES (?,?,?)`
func prepareReplace(
quoteTable string,
cols []*model.Column,
cols []*model.ColumnData,
tb *model.TableInfo,
appendPlaceHolder bool,
translateToInsert bool,
) (string, []interface{}) {
var builder strings.Builder
columnNames := make([]string, 0, len(cols))
args := make([]interface{}, 0, len(cols))
for _, col := range cols {
if col == nil || col.Flag.IsGeneratedColumn() {
colInfo := model.GetColumnInfo(col, tb)
colFlag := model.GetColumnFlag(col, tb)
if col == nil || colFlag.IsGeneratedColumn() {
continue
}
columnNames = append(columnNames, col.Name)
args = appendQueryArgs(args, col)
columnNames = append(columnNames, colInfo.Name.O)
args = appendQueryArgs(args, col, colInfo)
}
if len(args) == 0 {
return "", nil
Expand All @@ -108,8 +114,9 @@ func prepareReplace(
// representation. Because if we use the byte array respresentation, the go-sql-driver
// will automatically set `_binary` charset for that column, which is not expected.
// See https://github.com/go-sql-driver/mysql/blob/ce134bfc/connection.go#L267
func appendQueryArgs(args []interface{}, col *model.Column) []interface{} {
if col.Charset != "" && col.Charset != charset.CharsetBin {
func appendQueryArgs(args []interface{}, col *model.ColumnData, colInfo *pmodel.ColumnInfo) []interface{} {
cst := colInfo.GetCharset()
if cst != "" && cst != charset.CharsetBin {
colValBytes, ok := col.Value.([]byte)
if ok {
args = append(args, string(colValBytes))
Expand All @@ -125,11 +132,11 @@ func appendQueryArgs(args []interface{}, col *model.Column) []interface{} {

// prepareDelete builds a parametric DELETE statement as following
// sql: `DELETE FROM `test`.`t` WHERE x = ? AND y >= ? LIMIT 1`
func prepareDelete(quoteTable string, cols []*model.Column, forceReplicate bool) (string, []interface{}) {
func prepareDelete(quoteTable string, cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (string, []interface{}) {
var builder strings.Builder
builder.WriteString("DELETE FROM " + quoteTable + " WHERE ")

colNames, wargs := whereSlice(cols, forceReplicate)
colNames, wargs := whereSlice(cols, tb, forceReplicate)
if len(wargs) == 0 {
return "", nil
}
Expand All @@ -152,22 +159,31 @@ func prepareDelete(quoteTable string, cols []*model.Column, forceReplicate bool)

// whereSlice builds a parametric WHERE clause as following
// sql: `WHERE {} = ? AND {} > ?`
func whereSlice(cols []*model.Column, forceReplicate bool) (colNames []string, args []interface{}) {
// Try to use unique key values when available
for _, col := range cols {
if col == nil || !col.Flag.IsHandleKey() {
continue
}
colNames = append(colNames, col.Name)
args = appendQueryArgs(args, col)
}
// if no explicit row id but force replicate, use all key-values in where condition
func whereSlice(cols []*model.ColumnData, tb *model.TableInfo, forceReplicate bool) (colNames []string, args []interface{}) {
// If no explicit row id but force replicate, use all key-values in where condition.
if len(colNames) == 0 && forceReplicate {
colNames = make([]string, 0, len(cols))
args = make([]interface{}, 0, len(cols))
for _, col := range cols {
colNames = append(colNames, col.Name)
args = appendQueryArgs(args, col)
if col == nil {
continue
}
colInfo := model.GetColumnInfo(col, tb)
colNames = append(colNames, colInfo.Name.O)
args = appendQueryArgs(args, col, colInfo)
}
} else { // Try to use unique key values when available.
for _, col := range cols {
if col == nil {
continue
}
colFlag := model.GetColumnFlag(col, tb)
colInfo := model.GetColumnInfo(col, tb)
if !colFlag.IsHandleKey() {
continue
}
colNames = append(colNames, colInfo.Name.O)
args = appendQueryArgs(args, col, colInfo)
}
}
return
Expand Down
10 changes: 6 additions & 4 deletions cdc/sink/dmlsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,8 +571,9 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
if len(row.PreColumns) != 0 && len(row.Columns) != 0 {
query, args = prepareUpdate(
quoteTable,
row.GetPreColumns(),
row.GetColumns(),
row.PreColumns,
row.Columns,
row.TableInfo,
s.cfg.ForceReplicate)
if query != "" {
sqls = append(sqls, query)
Expand All @@ -584,7 +585,7 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {

// Delete Event
if len(row.PreColumns) != 0 {
query, args = prepareDelete(quoteTable, row.GetPreColumns(), s.cfg.ForceReplicate)
query, args = prepareDelete(quoteTable, row.PreColumns, row.TableInfo, s.cfg.ForceReplicate)
if query != "" {
sqls = append(sqls, query)
values = append(values, args)
Expand All @@ -598,7 +599,8 @@ func (s *mysqlBackend) prepareDMLs() *preparedDMLs {
if len(row.Columns) != 0 {
query, args = prepareReplace(
quoteTable,
row.GetColumns(),
row.Columns,
row.TableInfo,
true, /* appendPlaceHolder */
translateToInsert)
if query != "" {
Expand Down
Loading