diff --git a/pkg/datasource/sql/undo/builder/basic_undo_log_builder.go b/pkg/datasource/sql/undo/builder/basic_undo_log_builder.go index 043e7eb4..775cb619 100644 --- a/pkg/datasource/sql/undo/builder/basic_undo_log_builder.go +++ b/pkg/datasource/sql/undo/builder/basic_undo_log_builder.go @@ -276,35 +276,41 @@ func (b *BasicUndoLogBuilder) buildLockKey(rows driver.Rows, meta types.TableMet // the string as local key. the local key example(multi pk): "t_user:1_a,2_b" func (b *BasicUndoLogBuilder) buildLockKey2(records *types.RecordImage, meta types.TableMeta) string { - var ( - lockKeys bytes.Buffer - filedSequence int - ) + var lockKeys bytes.Buffer lockKeys.WriteString(meta.TableName) lockKeys.WriteString(":") keys := meta.GetPrimaryKeyOnlyName() + keyIndexMap := make(map[string]int, len(keys)) - for _, row := range records.Rows { - if filedSequence > 0 { + for idx, columnName := range keys { + keyIndexMap[columnName] = idx + } + + primaryKeyRows := make([][]interface{}, len(records.Rows)) + + for i, row := range records.Rows { + primaryKeyValues := make([]interface{}, len(keys)) + for _, column := range row.Columns { + if idx, exist := keyIndexMap[column.ColumnName]; exist { + primaryKeyValues[idx] = column.Value + } + } + primaryKeyRows[i] = primaryKeyValues + } + + for i, primaryKeyValues := range primaryKeyRows { + if i > 0 { lockKeys.WriteString(",") } - pkSplitIndex := 0 - for _, column := range row.Columns { - var hasKeyColumn bool - for _, key := range keys { - if column.ColumnName == key { - hasKeyColumn = true - if pkSplitIndex > 0 { - lockKeys.WriteString("_") - } - lockKeys.WriteString(fmt.Sprintf("%v", column.Value)) - pkSplitIndex++ - } + for j, pkVal := range primaryKeyValues { + if j > 0 { + lockKeys.WriteString("_") } - if hasKeyColumn { - filedSequence++ + if pkVal == nil { + continue } + lockKeys.WriteString(fmt.Sprintf("%v", pkVal)) } } diff --git a/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go b/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go index 152c0a47..465bf516 100644 --- a/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go +++ b/pkg/datasource/sql/undo/builder/basic_undo_log_builder_test.go @@ -50,22 +50,135 @@ func TestBuildWhereConditionByPKs(t *testing.T) { } func TestBuildLockKey(t *testing.T) { - metaData := types.TableMeta{ - TableName: "test_name", - Indexs: map[string]types.IndexMeta{ - "PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{{ColumnName: "id"}, {ColumnName: "userId"}}}, - }, + var builder BasicUndoLogBuilder + + columnID := types.ColumnMeta{ + ColumnName: "id", + } + columnUserId := types.ColumnMeta{ + ColumnName: "userId", + } + columnName := types.ColumnMeta{ + ColumnName: "name", + } + columnAge := types.ColumnMeta{ + ColumnName: "age", } + columnNonExistent := types.ColumnMeta{ + ColumnName: "non_existent", + } + + columnsTwoPk := []types.ColumnMeta{columnID, columnUserId} + columnsMixPk := []types.ColumnMeta{columnName, columnAge} - records := types.RecordImage{ - TableName: "test_name", - Rows: []types.RowImage{ - {Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, ColumnName: "id", Value: 1}, {KeyType: types.IndexTypePrimaryKey, ColumnName: "userId", Value: "one"}}}, - {Columns: []types.ColumnImage{{KeyType: types.IndexTypePrimaryKey, ColumnName: "id", Value: 2}, {KeyType: types.IndexTypePrimaryKey, ColumnName: "userId", Value: "two"}}}, + getColumnImage := func(columnName string, value interface{}) types.ColumnImage { + return types.ColumnImage{KeyType: types.IndexTypePrimaryKey, ColumnName: columnName, Value: value} + } + + tests := []struct { + name string + metaData types.TableMeta + records types.RecordImage + expected string + }{ + { + "Two Primary Keys", + types.TableMeta{ + TableName: "test_name", + Indexs: map[string]types.IndexMeta{ + "PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: columnsTwoPk}, + }, + }, + types.RecordImage{ + TableName: "test_name", + Rows: []types.RowImage{ + {[]types.ColumnImage{getColumnImage("id", 1), getColumnImage("userId", "one")}}, + {[]types.ColumnImage{getColumnImage("id", 2), getColumnImage("userId", "two")}}, + }, + }, + "test_name:1_one,2_two", + }, + { + name: "Single Primary Key", + metaData: types.TableMeta{ + TableName: "single_key", + Indexs: map[string]types.IndexMeta{ + "PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{columnID}}, + }, + }, + records: types.RecordImage{ + TableName: "single_key", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{getColumnImage("id", 100)}}, + }, + }, + expected: "single_key:100", + }, + { + name: "Mixed Type Keys", + metaData: types.TableMeta{ + TableName: "mixed_key", + Indexs: map[string]types.IndexMeta{ + "PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: columnsMixPk}, + }, + }, + records: types.RecordImage{ + TableName: "mixed_key", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{getColumnImage("name", "Alice"), getColumnImage("age", 25)}}, + }, + }, + expected: "mixed_key:Alice_25", + }, + { + name: "Empty Records", + metaData: types.TableMeta{ + TableName: "empty", + Indexs: map[string]types.IndexMeta{ + "PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{columnID}}, + }, + }, + records: types.RecordImage{TableName: "empty"}, + expected: "empty:", + }, + { + name: "Special Characters", + metaData: types.TableMeta{ + TableName: "special", + Indexs: map[string]types.IndexMeta{ + "PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{columnID}}, + }, + }, + records: types.RecordImage{ + TableName: "special", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{getColumnImage("id", "a,b_c")}}, + }, + }, + expected: "special:a,b_c", + }, + { + name: "Non-existent Key Name", + metaData: types.TableMeta{ + TableName: "error_key", + Indexs: map[string]types.IndexMeta{ + "PRIMARY_KEY": {IType: types.IndexTypePrimaryKey, Columns: []types.ColumnMeta{columnNonExistent}}, + }, + }, + records: types.RecordImage{ + TableName: "error_key", + Rows: []types.RowImage{ + {Columns: []types.ColumnImage{getColumnImage("id", 1)}}, + }, + }, + expected: "error_key:", }, } - builder := BasicUndoLogBuilder{} - lockKeys := builder.buildLockKey2(&records, metaData) - assert.Equal(t, "test_name:1_one,2_two", lockKeys) + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + lockKeys := builder.buildLockKey2(&tt.records, tt.metaData) + assert.Equal(t, tt.expected, lockKeys) + }) + } }