Skip to content

Commit

Permalink
Use md5 checksum instead of crc32 (#787)
Browse files Browse the repository at this point in the history
close #634
  • Loading branch information
michaelmdeng authored Aug 9, 2024
1 parent 202dcf9 commit 21a5788
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 35 deletions.
6 changes: 3 additions & 3 deletions sync_diff_inspector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,9 +583,9 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli
wg.Add(1)
go func() {
defer wg.Done()
upstreamInfo = df.upstream.GetCountAndCrc32(ctx, tableRange)
upstreamInfo = df.upstream.GetCountAndMd5(ctx, tableRange)
}()
downstreamInfo = df.downstream.GetCountAndCrc32(ctx, tableRange)
downstreamInfo = df.downstream.GetCountAndMd5(ctx, tableRange)
wg.Wait()

if upstreamInfo.Err != nil {
Expand All @@ -601,7 +601,7 @@ func (df *Diff) compareChecksumAndGetCount(ctx context.Context, tableRange *spli
if upstreamInfo.Count == downstreamInfo.Count && upstreamInfo.Checksum == downstreamInfo.Checksum {
return true, upstreamInfo.Count, downstreamInfo.Count, nil
}
log.Debug("checksum failed", zap.Any("chunk id", tableRange.ChunkRange.Index), zap.String("table", df.workSource.GetTables()[tableRange.GetTableIndex()].Table), zap.Int64("upstream chunk size", upstreamInfo.Count), zap.Int64("downstream chunk size", downstreamInfo.Count), zap.Int64("upstream checksum", upstreamInfo.Checksum), zap.Int64("downstream checksum", downstreamInfo.Checksum))
log.Debug("checksum doesn't match", zap.Any("chunk id", tableRange.ChunkRange.Index), zap.String("table", df.workSource.GetTables()[tableRange.GetTableIndex()].Table), zap.Int64("upstream chunk size", upstreamInfo.Count), zap.Int64("downstream chunk size", downstreamInfo.Count), zap.Uint64("upstream checksum", upstreamInfo.Checksum), zap.Uint64("downstream checksum", downstreamInfo.Checksum))
return false, upstreamInfo.Count, downstreamInfo.Count, nil
}

Expand Down
6 changes: 3 additions & 3 deletions sync_diff_inspector/source/mysql_shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *MySQLSources) Close() {
}
}

func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo {
func (s *MySQLSources) GetCountAndMd5(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo {
beginTime := time.Now()
table := s.tableDiffs[tableRange.GetTableIndex()]
chunk := tableRange.GetChunk()
Expand All @@ -103,7 +103,7 @@ func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitte

for _, ms := range matchSources {
go func(ms *common.TableShardSource) {
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args)
count, checksum, err := utils.GetCountAndMd5Checksum(ctx, ms.DBConn, ms.OriginSchema, ms.OriginTable, table.Info, chunk.Where, chunk.Args)
infoCh <- &ChecksumInfo{
Checksum: checksum,
Count: count,
Expand All @@ -116,7 +116,7 @@ func (s *MySQLSources) GetCountAndCrc32(ctx context.Context, tableRange *splitte
var (
err error
totalCount int64
totalChecksum int64
totalChecksum uint64
)

for range matchSources {
Expand Down
6 changes: 3 additions & 3 deletions sync_diff_inspector/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const (
)

type ChecksumInfo struct {
Checksum int64
Checksum uint64
Count int64
Err error
Cost time.Duration
Expand Down Expand Up @@ -82,8 +82,8 @@ type Source interface {
// there are many workers consume the range from the channel to compare.
GetRangeIterator(context.Context, *splitter.RangeInfo, TableAnalyzer, int) (RangeIterator, error)

// GetCountAndCrc32 gets the crc32 result and the count from given range.
GetCountAndCrc32(context.Context, *splitter.RangeInfo) *ChecksumInfo
// GetCountAndMd5 gets the md5 result and the count from given range.
GetCountAndMd5(context.Context, *splitter.RangeInfo) *ChecksumInfo

// GetCountForLackTable gets the count for tables that don't exist upstream or downstream.
GetCountForLackTable(context.Context, *splitter.RangeInfo) int64
Expand Down
8 changes: 4 additions & 4 deletions sync_diff_inspector/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,10 @@ func TestTiDBSource(t *testing.T) {
require.Equal(t, n, tableCase.rangeInfo.GetTableIndex())
countRows := sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456)
mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows)
checksum := tidb.GetCountAndCrc32(ctx, tableCase.rangeInfo)
checksum := tidb.GetCountAndMd5(ctx, tableCase.rangeInfo)
require.NoError(t, checksum.Err)
require.Equal(t, checksum.Count, int64(123))
require.Equal(t, checksum.Checksum, int64(456))
require.Equal(t, checksum.Checksum, uint64(456))
}

// Test ChunkIterator
Expand Down Expand Up @@ -392,14 +392,14 @@ func TestMysqlShardSources(t *testing.T) {

for n, tableCase := range tableCases {
require.Equal(t, n, tableCase.rangeInfo.GetTableIndex())
var resChecksum int64 = 0
var resChecksum uint64 = 0
for i := 0; i < len(dbs); i++ {
resChecksum = resChecksum + 1<<i
countRows := sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(1, 1<<i)
mock.ExpectQuery("SELECT COUNT.*").WillReturnRows(countRows)
}

checksum := shard.GetCountAndCrc32(ctx, tableCase.rangeInfo)
checksum := shard.GetCountAndMd5(ctx, tableCase.rangeInfo)
require.NoError(t, checksum.Err)
require.Equal(t, checksum.Count, int64(len(dbs)))
require.Equal(t, checksum.Checksum, resChecksum)
Expand Down
4 changes: 2 additions & 2 deletions sync_diff_inspector/source/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,13 @@ func (s *TiDBSource) GetRangeIterator(ctx context.Context, r *splitter.RangeInfo
func (s *TiDBSource) Close() {
s.dbConn.Close()
}
func (s *TiDBSource) GetCountAndCrc32(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo {
func (s *TiDBSource) GetCountAndMd5(ctx context.Context, tableRange *splitter.RangeInfo) *ChecksumInfo {
beginTime := time.Now()
table := s.tableDiffs[tableRange.GetTableIndex()]
chunk := tableRange.GetChunk()

matchSource := getMatchSource(s.sourceTableMap, table)
count, checksum, err := utils.GetCountAndCRC32Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args)
count, checksum, err := utils.GetCountAndMd5Checksum(ctx, s.dbConn, matchSource.OriginSchema, matchSource.OriginTable, table.Info, chunk.Where, chunk.Args)

cost := time.Since(beginTime)
return &ChecksumInfo{
Expand Down
33 changes: 16 additions & 17 deletions sync_diff_inspector/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -765,16 +765,16 @@ func GetTableSize(ctx context.Context, db *sql.DB, schemaName, tableName string)
return dataSize.Int64, nil
}

// GetCountAndCRC32Checksum returns checksum code and count of some data by given condition
func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, int64, error) {
// GetCountAndMd5Checksum returns checksum code and count of some data by given condition
func GetCountAndMd5Checksum(ctx context.Context, db *sql.DB, schemaName, tableName string, tbInfo *model.TableInfo, limitRange string, args []interface{}) (int64, uint64, error) {
/*
calculate CRC32 checksum and count example:
mysql> select count(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', id, name, age, CONCAT(ISNULL(id), ISNULL(name), ISNULL(age))))AS UNSIGNED)) as CHECKSUM from test.test where id > 0;
+--------+------------+
| CNT | CHECKSUM |
+--------+------------+
| 100000 | 1128664311 |
+--------+------------+
calculate MD5 checksum and count example:
mysql> SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', `id`, `name`, CONCAT(ISNULL(`id`), ISNULL(`name`)))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM `a`.`t`;
+--------+----------------------
| CNT | CHECKSUM |
+--------+----------------------
| 100000 | 3462532621352132810 |
+--------+----------------------
1 row in set (0.46 sec)
*/
columnNames := make([]string, 0, len(tbInfo.Columns))
Expand All @@ -796,24 +796,23 @@ func GetCountAndCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, table
columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(%s)", name))
}

query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange)
query := fmt.Sprintf("SELECT COUNT(*) as CNT, BIT_XOR(CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 1, 16), 16, 10) AS UNSIGNED) ^ CAST(CONV(SUBSTRING(MD5(CONCAT_WS(',', %s, CONCAT(%s))), 17, 16), 16, 10) AS UNSIGNED)) as CHECKSUM FROM %s WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), dbutil.TableName(schemaName, tableName), limitRange)
log.Debug("count and checksum", zap.String("sql", query), zap.Reflect("args", args))

var count sql.NullInt64
var checksum sql.NullInt64
var checksum uint64
err := db.QueryRowContext(ctx, query, args...).Scan(&count, &checksum)
if err != nil {
log.Warn("execute checksum query fail", zap.String("query", query), zap.Reflect("args", args), zap.Error(err))
return -1, -1, errors.Trace(err)
return -1, 0, errors.Trace(err)
}
if !count.Valid || !checksum.Valid {
if !count.Valid {
// if don't have any data, the checksum will be `NULL`
log.Warn("get empty count or checksum", zap.String("sql", query), zap.Reflect("args", args))
log.Warn("get empty count", zap.String("sql", query), zap.Reflect("args", args))
return 0, 0, nil
}

return count.Int64, checksum.Int64, nil
return count.Int64, checksum, nil
}

// GetRandomValues returns some random values. Different from /pkg/dbutil.GetRandomValues, it returns multi-columns at the same time.
Expand Down
6 changes: 3 additions & 3 deletions sync_diff_inspector/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ func TestBasicTableUtilOperation(t *testing.T) {
require.Equal(t, tableInfo.Indices[0].Columns[1].Offset, 1)
}

func TestGetCountAndCRC32Checksum(t *testing.T) {
func TestGetCountAndMd5Checksum(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

Expand All @@ -271,10 +271,10 @@ func TestGetCountAndCRC32Checksum(t *testing.T) {

mock.ExpectQuery("SELECT COUNT.*FROM `test_schema`\\.`test_table` WHERE \\[23 45\\].*").WithArgs("123", "234").WillReturnRows(sqlmock.NewRows([]string{"CNT", "CHECKSUM"}).AddRow(123, 456))

count, checksum, err := GetCountAndCRC32Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", []interface{}{"123", "234"})
count, checksum, err := GetCountAndMd5Checksum(ctx, conn, "test_schema", "test_table", tableInfo, "[23 45]", []interface{}{"123", "234"})
require.NoError(t, err)
require.Equal(t, count, int64(123))
require.Equal(t, checksum, int64(456))
require.Equal(t, checksum, uint64(0x1c8))
}

func TestGetApproximateMid(t *testing.T) {
Expand Down

0 comments on commit 21a5788

Please sign in to comment.