Skip to content

Commit

Permalink
diff: can use multiple columns to split chunks (#197)
Browse files Browse the repository at this point in the history
* sync_diff_inspector: fix a misleading regex example in config.toml (#141)

* update pkg about database (#142)

* diff: add database name and table name router (#172)

* simplify the if/else logic (#179)

refactor to simplify the if/else logic

* diff: can use multiple columns to split chunks (#130)

* simplify the if/else logic (#191)
  • Loading branch information
WangXiangUSTC authored Feb 13, 2019
1 parent 170311c commit 1a02077
Show file tree
Hide file tree
Showing 16 changed files with 1,317 additions and 458 deletions.
242 changes: 218 additions & 24 deletions pkg/dbutil/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb-tools/pkg/utils"
"github.com/pingcap/tidb/types"
log "github.com/sirupsen/logrus"
)

Expand All @@ -37,6 +39,9 @@ const (
var (
// ErrVersionNotFound means can't get the database's version
ErrVersionNotFound = errors.New("can't get the database's version")

// ErrNoData means no data in table
ErrNoData = errors.New("no data found")
)

// DBConfig is database configuration.
Expand Down Expand Up @@ -164,48 +169,101 @@ func GetRowCount(ctx context.Context, db *sql.DB, schemaName string, tableName s
return cnt.Int64, nil
}

// GetRandomValues returns some random value of a column.
func GetRandomValues(ctx context.Context, db *sql.DB, schemaName, table, column string, num int64, min, max interface{}, limitRange string, collation string) ([]interface{}, error) {
// GetRandomValues returns some random value and these value's count of a column, just like sampling. Tips: limitArgs is the value in limitRange.
func GetRandomValues(ctx context.Context, db *sql.DB, schemaName, table, column string, num int, limitRange string, limitArgs []interface{}, collation string) ([]string, []int, error) {
/*
example:
mysql> SELECT `id` FROM (SELECT `id` FROM `test`.`test` WHERE `id` COLLATE "latin1_bin" > 0 AND `id` COLLATE "latin1_bin" < 100 AND true ORDER BY RAND() LIMIT 3)rand_tmp ORDER BY `id` COLLATE "latin1_bin";
+----------+
| rand_tmp |
+----------+
| 15 |
| 58 |
| 67 |
+----------+
mysql> SELECT `id`, COUNT(*) count FROM (SELECT `id` FROM `test`.`test` WHERE `id` COLLATE "latin1_bin" > 0 AND `id` COLLATE "latin1_bin" < 100 ORDER BY RAND() LIMIT 5) rand_tmp GROUP BY `id` ORDER BY `id` COLLATE "latin1_bin";
+------+-------+
| id | count |
+------+-------+
| 1 | 2 |
| 2 | 2 |
| 3 | 1 |
+------+-------+
FIXME: TiDB now don't return rand value when use `ORDER BY RAND()`
*/

if limitRange != "" {
limitRange = "true"
if limitRange == "" {
limitRange = "TRUE"
}

if collation != "" {
collation = fmt.Sprintf(" COLLATE \"%s\"", collation)
}

randomValue := make([]interface{}, 0, num)
query := fmt.Sprintf("SELECT `%s` FROM (SELECT `%s` FROM `%s`.`%s` WHERE `%s`%s > ? AND `%s`%s < ? AND %s ORDER BY RAND() LIMIT %d)rand_tmp ORDER BY `%s`%s",
column, column, schemaName, table, column, collation, column, collation, limitRange, num, column, collation)
log.Debugf("get random values sql: %s, min: %v, max: %v", query, min, max)
rows, err := db.QueryContext(ctx, query, min, max)
randomValue := make([]string, 0, num)
valueCount := make([]int, 0, num)

query := fmt.Sprintf("SELECT %[1]s, COUNT(*) count FROM (SELECT %[1]s FROM %[2]s WHERE %[3]s ORDER BY RAND() LIMIT %[4]d)rand_tmp GROUP BY %[1]s ORDER BY %[1]s%[5]s",
escapeName(column), TableName(schemaName, table), limitRange, num, collation)
log.Debugf("get random values sql: %s, args: %v", query, limitArgs)

rows, err := db.QueryContext(ctx, query, limitArgs...)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
defer rows.Close()

for rows.Next() {
var value interface{}
err = rows.Scan(&value)
var value string
var count int
err = rows.Scan(&value, &count)
if err != nil {
return nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
randomValue = append(randomValue, value)
valueCount = append(valueCount, count)
}

return randomValue, nil
return randomValue, valueCount, errors.Trace(rows.Err())
}

// GetMinMaxValue return min and max value of given column by specified limitRange condition.
func GetMinMaxValue(ctx context.Context, db *sql.DB, schema, table, column string, limitRange string, limitArgs []interface{}, collation string) (string, string, error) {
/*
example:
mysql> SELECT MIN(`id`) as MIN, MAX(`id`) as MAX FROM `test`.`testa` WHERE id > 0 AND id < 10;
+------+------+
| MIN | MAX |
+------+------+
| 1 | 2 |
+------+------+
*/

if limitRange == "" {
limitRange = "TRUE"
}

if collation != "" {
collation = fmt.Sprintf(" COLLATE \"%s\"", collation)
}

query := fmt.Sprintf("SELECT /*!40001 SQL_NO_CACHE */ MIN(`%s`%s) as MIN, MAX(`%s`%s) as MAX FROM `%s`.`%s` WHERE %s",
column, collation, column, collation, schema, table, limitRange)
log.Debugf("GetMinMaxValue query: %v, args: %v", query, limitArgs)

var min, max sql.NullString
rows, err := db.QueryContext(ctx, query, limitArgs...)
if err != nil {
return "", "", errors.Trace(err)
}
defer rows.Close()

for rows.Next() {
err = rows.Scan(&min, &max)
if err != nil {
return "", "", errors.Trace(err)
}
}

if !min.Valid || !max.Valid {
// don't have any data
return "", "", ErrNoData
}

return min.String, max.String, errors.Trace(rows.Err())
}

// GetTables returns name of all tables in the specified schema
Expand Down Expand Up @@ -302,8 +360,8 @@ func GetCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName str
columnIsNull = append(columnIsNull, fmt.Sprintf("ISNULL(`%s`)", col.Name.O))
}

query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM `%s`.`%s` WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), schemaName, tableName, limitRange)
query := fmt.Sprintf("SELECT BIT_XOR(CAST(CRC32(CONCAT_WS(',', %s, CONCAT(%s)))AS UNSIGNED)) AS checksum FROM %s WHERE %s;",
strings.Join(columnNames, ", "), strings.Join(columnIsNull, ", "), TableName(schemaName, tableName), limitRange)
log.Debugf("checksum sql: %s, args: %v", query, args)

var checksum sql.NullInt64
Expand All @@ -320,6 +378,130 @@ func GetCRC32Checksum(ctx context.Context, db *sql.DB, schemaName, tableName str
return checksum.Int64, nil
}

// Bucket saves the bucket information from TiDB.
type Bucket struct {
Count int64
LowerBound string
UpperBound string
}

// GetBucketsInfo SHOW STATS_BUCKETS in TiDB.
func GetBucketsInfo(ctx context.Context, db *sql.DB, schema, table string, tableInfo *model.TableInfo) (map[string][]Bucket, error) {
/*
example in tidb:
mysql> SHOW STATS_BUCKETS WHERE db_name= "test" AND table_name="testa";
+---------+------------+----------------+-------------+----------+-----------+-------+---------+---------------------+---------------------+
| Db_name | Table_name | Partition_name | Column_name | Is_index | Bucket_id | Count | Repeats | Lower_Bound | Upper_Bound |
+---------+------------+----------------+-------------+----------+-----------+-------+---------+---------------------+---------------------+
| test | testa | | PRIMARY | 1 | 0 | 64 | 1 | 1846693550524203008 | 1846838686059069440 |
| test | testa | | PRIMARY | 1 | 1 | 128 | 1 | 1846840885082324992 | 1847056389361369088 |
+---------+------------+----------------+-------------+----------+-----------+-------+---------+---------------------+---------------------+
*/
buckets := make(map[string][]Bucket)
query := "SHOW STATS_BUCKETS WHERE db_name= ? AND table_name= ?;"
log.Debugf("GetBucketsInfo query: %s", query)

rows, err := db.QueryContext(ctx, query, schema, table)
if err != nil {
return nil, errors.Trace(err)
}
defer rows.Close()

cols, err := rows.Columns()
if err != nil {
return nil, errors.Trace(err)
}

for rows.Next() {
var dbName, tableName, partitionName, columnName, lowerBound, upperBound sql.NullString
var isIndex, bucketID, count, repeats sql.NullInt64

// add partiton_name in new version
switch len(cols) {
case 9:
err = rows.Scan(&dbName, &tableName, &columnName, &isIndex, &bucketID, &count, &repeats, &lowerBound, &upperBound)
case 10:
err = rows.Scan(&dbName, &tableName, &partitionName, &columnName, &isIndex, &bucketID, &count, &repeats, &lowerBound, &upperBound)
default:
return nil, errors.New("Unknown struct for buckets info")
}
if err != nil {
return nil, errors.Trace(err)
}

if _, ok := buckets[columnName.String]; !ok {
buckets[columnName.String] = make([]Bucket, 0, 100)
}
buckets[columnName.String] = append(buckets[columnName.String], Bucket{
Count: count.Int64,
LowerBound: lowerBound.String,
UpperBound: upperBound.String,
})
}

// when primary key is int type, the columnName will be column's name, not `PRIMARY`, check and transform here.
indices := FindAllIndex(tableInfo)
for _, index := range indices {
if index.Name.O != "PRIMARY" {
continue
}
_, ok := buckets[index.Name.O]
if !ok && len(index.Columns) == 1 {
if _, ok := buckets[index.Columns[0].Name.O]; !ok {
return nil, errors.NotFoundf("primary key on %s in buckets info", index.Columns[0].Name.O)
}
buckets[index.Name.O] = buckets[index.Columns[0].Name.O]
delete(buckets, index.Columns[0].Name.O)
}
}

return buckets, errors.Trace(rows.Err())
}

// AnalyzeValuesFromBuckets analyze upperBound or lowerBound to string for each column.
// upperBound and lowerBound are looks like '(123, abc)' for multiple fields, or '123' for one field.
func AnalyzeValuesFromBuckets(valueString string, cols []*model.ColumnInfo) ([]string, error) {
// FIXME: maybe some values contains '(', ')' or ', '
vStr := strings.Trim(valueString, "()")
values := strings.Split(vStr, ", ")
if len(values) != len(cols) {
return nil, errors.Errorf("analyze value %s failed", valueString)
}

for i, col := range cols {
if IsTimeTypeAndNeedDecode(col.Tp) {
value, err := DecodeTimeInBucket(values[i])
if err != nil {
return nil, errors.Trace(err)
}

values[i] = value
}
}

return values, nil
}

// DecodeTimeInBucket decodes Time from a packed uint64 value.
func DecodeTimeInBucket(packedStr string) (string, error) {
packed, err := strconv.ParseUint(packedStr, 10, 64)
if err != nil {
return "", err
}

if packed == 0 {
return "", nil
}

t := new(types.Time)
err = t.FromPackedUint(packed)
if err != nil {
return "", err
}

return t.String(), nil
}

// GetTidbLatestTSO returns tidb's current TSO.
func GetTidbLatestTSO(ctx context.Context, db *sql.DB) (int64, error) {
/*
Expand Down Expand Up @@ -421,3 +603,15 @@ func TableName(schema, table string) string {
func escapeName(name string) string {
return strings.Replace(name, "`", "``", -1)
}

// ReplacePlaceholder will use args to replace '?', used for log.
// tips: make sure the num of "?" is same with len(args)
func ReplacePlaceholder(str string, args []string) string {
/*
for example:
str is "a > ? AND a < ?", args is {'1', '2'},
this function will return "a > '1' AND a < '2'"
*/
newStr := strings.Replace(str, "?", "'%s'", -1)
return fmt.Sprintf(newStr, utils.StringsToInterfaces(args)...)
}
71 changes: 71 additions & 0 deletions pkg/dbutil/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2018 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package dbutil

import (
. "github.com/pingcap/check"
)

func (*testDBSuite) TestReplacePlaceholder(c *C) {
testCases := []struct {
originStr string
args []string
expectStr string
}{
{
"a > ? AND a < ?",
[]string{"1", "2"},
"a > '1' AND a < '2'",
}, {
"a = ? AND b = ?",
[]string{"1", "2"},
"a = '1' AND b = '2'",
},
}

for _, testCase := range testCases {
str := ReplacePlaceholder(testCase.originStr, testCase.args)
c.Assert(str, Equals, testCase.expectStr)
}

}

func (*testDBSuite) TestTableName(c *C) {
testCases := []struct {
schema string
table string
expectTableName string
}{
{
"test",
"testa",
"`test`.`testa`",
},
{
"test-1",
"test-a",
"`test-1`.`test-a`",
},
{
"test",
"t`esta",
"`test`.`t``esta`",
},
}

for _, testCase := range testCases {
tableName := TableName(testCase.schema, testCase.table)
c.Assert(tableName, Equals, testCase.expectTableName)
}
}
Loading

0 comments on commit 1a02077

Please sign in to comment.