Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sync-diff-inspector: Use md5 func replace crc32 for checksum #707

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,9 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli
wg.Add(1)
go func() {
defer wg.Done()
upstreamInfo = df.upstream.GetCountAndCrc32(ctx, tableRange)
upstreamInfo = df.upstream.GetCountAndMD5(ctx, tableRange)
}()
downstreamInfo = df.downstream.GetCountAndCrc32(ctx, tableRange)
downstreamInfo = df.downstream.GetCountAndMD5(ctx, tableRange)
wg.Wait()

if upstreamInfo.Err != nil {
Expand All @@ -601,7 +601,7 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli
if upstreamInfo.Count == downstreamInfo.Count && upstreamInfo.Checksum == downstreamInfo.Checksum {
return true, upstreamInfo.Count, downstreamInfo.Count, nil
}
log.Debug("checksum failed", zap.Any("chunk id", tableRange.ChunkRange.Index), zap.String("table", df.workSource.GetTables()[tableRange.GetTableIndex()].Table), zap.Int64("upstream chunk size", upstreamInfo.Count), zap.Int64("downstream chunk size", downstreamInfo.Count), zap.Int64("upstream checksum", upstreamInfo.Checksum), zap.Int64("downstream checksum", downstreamInfo.Checksum))
log.Debug("checksum failed", zap.Any("chunk id", tableRange.ChunkRange.Index), zap.String("table", df.workSource.GetTables()[tableRange.GetTableIndex()].Table), zap.Int64("upstream chunk size", upstreamInfo.Count), zap.Int64("downstream chunk size", downstreamInfo.Count), zap.String("upstream checksum", upstreamInfo.Checksum), zap.String("downstream checksum", downstreamInfo.Checksum))
return false, upstreamInfo.Count, downstreamInfo.Count, nil
}

Expand Down
41 changes: 25 additions & 16 deletions sync_diff_inspector/source/mysql_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"database/sql"
"fmt"
"strconv"
"time"

tableFilter "github.com/pingcap/tidb-tools/pkg/table-filter"
Expand Down Expand Up @@ -93,42 +94,50 @@ func (s *MySQLSources) Close() {
}
}

func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo {
func (s *MySQLSources) GetCountAndMD5(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo {
beginTime := time.Now()
table := s.tableDiffs[tableRange.GetTableIndex()]
chunk := tableRange.GetChunk()

matchSources := getMatchedSourcesForTable(s.sourceTablesMap, table)
infoCh := make(chan *ChecksumInfo, len(s.sourceTablesMap))

type checksumInfo struct {
lMD5 uint64
rMD5 uint64
count int64
err error
}
infoCh := make(chan *checksumInfo, len(s.sourceTablesMap))
for _, ms := range matchSources {
go func(ms *common.TableShardSource) {
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args)
infoCh <- &ChecksumInfo{
Checksum: checksum,
Count: count,
Err: err,
count, lmd5, rmd5, err := utils.GetCountAndMD5Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args)
infoCh <- &checksumInfo{
lMD5: lmd5,
rMD5: rmd5,
count: count,
err: err,
}
}(ms)
}
defer close(infoCh)

var (
err error
totalCount int64
totalChecksum int64
err error
totalCount int64
totalLMD5 uint64
totalRMD5 uint64
)

for range matchSources {
info := <-infoCh
// catch the first error
if err == nil && info.Err != nil {
err = info.Err
if err == nil && info.err != nil {
err = info.err
}
totalCount += info.Count
totalChecksum ^= info.Checksum
totalCount += info.count
totalLMD5 ^= info.lMD5
totalRMD5 ^= info.rMD5
}

totalChecksum := strconv.FormatUint(totalLMD5, 16) + strconv.FormatUint(totalRMD5, 16)
cost := time.Since(beginTime)
return &ChecksumInfo{
Checksum: totalChecksum,
Expand Down
6 changes: 3 additions & 3 deletions sync_diff_inspector/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
)

type ChecksumInfo struct {
Checksum int64
Checksum string
Count int64
Err error
Cost time.Duration
Expand Down Expand Up @@ -82,8 +82,8 @@ type Source interface {
// there are many workers consume the range from the channel to compare.
GetRangeIterator(context.Context, *splitter.RangeInfo, TableAnalyzer, int) (RangeIterator, error)

// GetCountAndCrc32 gets the crc32 result and the count from given range.
GetCountAndCrc32(context.Context, *splitter.RangeInfo) *ChecksumInfo
// GetCountAndMD5 gets the md5 result and the count from given range.
GetCountAndMD5(context.Context, *splitter.RangeInfo) *ChecksumInfo

// GetCountForLackTable gets the count for tables that don't exist upstream or downstream.
GetCountForLackTable(context.Context, *splitter.RangeInfo) int64
Expand Down
14 changes: 7 additions & 7 deletions sync_diff_inspector/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,12 @@ func TestTiDBSource(t *testing.T) {
require.NoError(t, err)
require.True(t, check)
require.Equal(t, n, tableCase.rangeInfo.GetTableIndex())
countRows := sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)
countRows := sqlmock.NewRows([]string{"CNT", "LMD5", "RMD5"}).AddRow(123, 456, 789)
mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows)
checksum := tidb.GetCountAndCrc32(ctx, tableCase.rangeInfo)
checksum := tidb.GetCountAndMD5(ctx, tableCase.rangeInfo)
require.NoError(t, checksum.Err)
require.Equal(t, checksum.Count, int64(123))
require.Equal(t, checksum.Checksum, int64(456))
require.Equal(t, checksum.Checksum, "1c8315")
}

// Test ChunkIterator
Expand Down Expand Up @@ -390,17 +390,17 @@ func TestMysqlShardSources(t *testing.T) {

for n, tableCase := range tableCases {
require.Equal(t, n, tableCase.rangeInfo.GetTableIndex())
var resChecksum int64 = 0
var resChecksum uint64 = 0
for i := 0; i < len(dbs); i++ {
resChecksum = resChecksum + 1<<i
countRows := sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(1, 1<<i)
countRows := sqlmock.NewRows([]string{"CNT", "LMD5", "RMD5"}).AddRow(1, 1<<i, 1<<i)
mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows)
}

checksum := shard.GetCountAndCrc32(ctx, tableCase.rangeInfo)
checksum := shard.GetCountAndMD5(ctx, tableCase.rangeInfo)
require.NoError(t, checksum.Err)
require.Equal(t, checksum.Count, int64(len(dbs)))
require.Equal(t, checksum.Checksum, resChecksum)
require.Equal(t, checksum.Checksum, strconv.FormatUint(resChecksum, 16)+strconv.FormatUint(resChecksum, 16))
}

// Test RowIterator
Expand Down
8 changes: 5 additions & 3 deletions sync_diff_inspector/source/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"database/sql"
"fmt"
"strconv"
"time"

"github.com/coreos/go-semver/semver"
Expand Down Expand Up @@ -120,14 +121,15 @@ func (s *TiDBSource) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo
func (s *TiDBSource) Close() {
s.dbConn.Close()
}
func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo {

func (s *TiDBSource) GetCountAndMD5(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo {
beginTime := time.Now()
table := s.tableDiffs[tableRange.GetTableIndex()]
chunk := tableRange.GetChunk()

matchSource := getMatchSource(s.sourceTableMap, table)
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args)

count, lmd5, rmd5, err := utils.GetCountAndMD5Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args)
checksum := strconv.FormatUint(lmd5, 16) + strconv.FormatUint(rmd5, 16)
cost := time.Since(beginTime)
return &ChecksumInfo{
Checksum: checksum,
Expand Down
41 changes: 19 additions & 22 deletions sync_diff_inspector/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,16 +743,16 @@ func GetTableSize(ctx context.Context, db *sql.DB, schemaName, tableName string)
return dataSize.Int64, nil
}

// GetCountAndCRC32Checksum returns checksum code and count of some data by given condition
func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, int64, error) {
// GetCountAndMD5Checksum returns checksum code and count of some data by given condition
func GetCountAndMD5Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, uint64, uint64, error) {
/*
calculate CRC32 checksum and count example:
mysql> select count(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) as CHECKSUM from test.test where id > 0;
+--------+------------+
| CNT | CHECKSUM |
+--------+------------+
| 100000 | 1128664311 |
+--------+------------+
calculate MD5 checksum and count example:
mysql> SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 1, 16), 16, 10) AS UNSIGNED)) LMD5, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 17, 16), 16, 10) AS UNSIGNED)) RMD5 FROM `a`.`t`;
+--------+--------------------------------------------+
| CNT | LMD5 | RMD5 |
+--------+--------------------------------------------+
| 100000 | 3462532621352132810 | 17515372630935707780 |
+--------+--------------------------------------------+
1 row in set (0.46 sec)
*/
columnNames := make([]string, 0, len(tbInfo.Columns))
Expand All @@ -770,24 +770,21 @@ func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, table
columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name))
}

query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange)
query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 1, 16), 16, 10) AS UNSIGNED)) LMD5, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 17, 16), 16, 10) AS UNSIGNED)) RMD5 FROM %s WHERE %s;",
Copy link
Contributor

@Leavrth Leavrth Mar 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😭 Unfortunately, it will take twice as long.
Currently TiDB hasn't optimized this kind of SQL. The sub expression MD5(CONCAT_WS(',', %s, CONCAT(%s))) will be calculated twice.
pingcap/tidb#39576

strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange)
log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args))

var count sql.NullInt64
var checksum sql.NullInt64
err := db.QueryRowContext(ctx, query, args...).Scan(&count, &checksum)
var (
count int64
lmd5 uint64
rmd5 uint64
)
err := db.QueryRowContext(ctx, query, args...).Scan(&count, &lmd5, &rmd5)
if err != nil {
log.Warn("execute checksum query fail", zap.String("query", query), zap.Reflect("args", args), zap.Error(err))
return -1, -1, errors.Trace(err)
return -1, 0, 0, errors.Trace(err)
}
if !count.Valid || !checksum.Valid {
// if don't have any data, the checksum will be `NULL`
log.Warn("get empty count or checksum", zap.String("sql", query), zap.Reflect("args", args))
return 0, 0, nil
}

return count.Int64, checksum.Int64, nil
return count, lmd5, rmd5, nil
}

// GetRandomValues returns some random values. Different from /pkg/dbutil.GetRandomValues, it returns multi-columns at the same time.
Expand Down
9 changes: 5 additions & 4 deletions sync_diff_inspector/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ func TestBasicTableUtilOperation(t *testing.T) {
require.Equal(t, tableInfo.Indices[0].Columns[1].Offset, 1)
}

func TestGetCountAndCRC32Checksum(t *testing.T) {
func TestGetCountAndMD5ChecksumChecksum(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

Expand All @@ -241,12 +241,13 @@ func TestGetCountAndCRC32Checksum(t *testing.T) {
tableInfo, err := dbutil.GetTableInfoBySQL(createTableSQL, parser.New())
require.NoError(t, err)

mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456))
mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "LMD5", "RMD5"}).AddRow(123, 456, 789))

count, checksum, err := GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", []interface{}{"123", "234"})
count, lmd5, rmd5, err := GetCountAndMD5Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", []interface{}{"123", "234"})
require.NoError(t, err)
require.Equal(t, count, int64(123))
require.Equal(t, checksum, int64(456))
require.Equal(t, lmd5, uint64(0x1c8))
require.Equal(t, rmd5, uint64(0x315))
}

func TestGetApproximateMid(t *testing.T) {
Expand Down