diff --git a/sync_diff_inspector/diff.go b/sync_diff_inspector/diff.go index 94b95d58..b46885d0 100644 --- a/sync_diff_inspector/diff.go +++ b/sync_diff_inspector/diff.go @@ -322,6 +322,28 @@ func (df *Diff) compareStruct(ctx context.Context, tableIndex int) (isEqual bool return isEqual, isSkip, nil } +func GetSnapshot(latestSnap []string, snap string, db *sql.DB) string { + if len(latestSnap) != 1 { + return snap + } + + latestSnapshotVal, err := utils.ParseSnapshotToTSO(db, latestSnap[0]) + if err != nil || latestSnapshotVal == 0 { + return snap + } + + snapshotVal, err := utils.ParseSnapshotToTSO(db, snap) + if err != nil { + return latestSnap[0] + } + + // compare the snapshot and choose the small one to lock + if latestSnapshotVal < snapshotVal { + return latestSnap[0] + } + return snap +} + func (df *Diff) startGCKeeperForTiDB(ctx context.Context, db *sql.DB, snap string) { pdCli, _ := utils.GetPDClientForGC(ctx, db) if pdCli != nil { @@ -332,15 +354,7 @@ func (df *Diff) startGCKeeperForTiDB(ctx context.Context, db *sql.DB, snap strin return } - if len(latestSnap) == 1 { - if len(snap) == 0 { - snap = latestSnap[0] - } - // compare the snapshot and choose the small one to lock - if strings.Compare(latestSnap[0], snap) < 0 { - snap = latestSnap[0] - } - } + snap = GetSnapshot(latestSnap, snap, db) err = utils.StartGCSavepointUpdateService(ctx, pdCli, db, snap) if err != nil { diff --git a/sync_diff_inspector/diff_test.go b/sync_diff_inspector/diff_test.go new file mode 100644 index 00000000..193dbe2c --- /dev/null +++ b/sync_diff_inspector/diff_test.go @@ -0,0 +1,107 @@ +// Copyright 2024 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "testing" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/require" +) + +func TestGetSnapshot(t *testing.T) { + cases := []struct { + latestSnapshot []string + snapshot string + expected string + snapshotRows string + }{ + { + latestSnapshot: []string{}, + snapshot: "1", + expected: "1", + }, + { + latestSnapshot: []string{"2"}, + snapshot: "", + expected: "2", + }, + { + latestSnapshot: []string{"0"}, + snapshot: "3", + expected: "3", + }, + { + latestSnapshot: []string{"4"}, + snapshot: "0", + expected: "0", + }, + { + latestSnapshot: []string{"5"}, + snapshot: "6", + expected: "5", + }, + { + latestSnapshot: []string{"7"}, + snapshot: "6", + expected: "6", + }, + { + // 2017-10-07 16:45:26 + latestSnapshot: []string{"395146933305344000"}, + snapshot: "2017-10-08 16:45:26", + expected: "395146933305344000", + snapshotRows: "1507452326", + }, + { + // 2017-10-07 16:45:26 + latestSnapshot: []string{"395146933305344000"}, + snapshot: "2017-10-06 16:45:26", + expected: "2017-10-06 16:45:26", + snapshotRows: "1507279526", + }, + { + latestSnapshot: []string{"1"}, + snapshot: "2017-10-06 16:45:26", + expected: "1", + snapshotRows: "1507279526", + }, + { + latestSnapshot: []string{"395146933305344000"}, + snapshot: "1", + expected: "1", + }, + { + // 2090-11-19 22:07:45 + latestSnapshot: []string{"1000022649077760000"}, + snapshot: "2090-11-18 22:07:45", + expected: "2090-11-18 22:07:45", + snapshotRows: "3814697265", + }, + } + + conn, mock, err := sqlmock.New() + require.NoError(t, err) + defer conn.Close() + + for i, cs := range cases { + if len(cs.snapshotRows) > 0 { + dataRows := sqlmock.NewRows([]string{""}).AddRow(cs.snapshotRows) + mock.ExpectQuery("SELECT unix_timestamp(?)").WillReturnRows(dataRows) + } + val := GetSnapshot(cs.latestSnapshot, cs.snapshot, conn) + require.Equal(t, cs.expected, val, "case %d", i) + } + +} diff --git a/sync_diff_inspector/utils/pd.go b/sync_diff_inspector/utils/pd.go index e3c92f16..4f7db4e8 100644 --- a/sync_diff_inspector/utils/pd.go +++ b/sync_diff_inspector/utils/pd.go @@ -207,7 +207,7 @@ func StartGCSavepointUpdateService(ctx context.Context, pdCli pd.Client, db *sql return nil } // get latest snapshot - snapshotTS, err := parseSnapshotToTSO(db, snapshot) + snapshotTS, err := ParseSnapshotToTSO(db, snapshot) if tidbVersion.Compare(*autoGCSafePointVersion) > 0 { log.Info("tidb support auto gc safepoint", zap.Stringer("version", tidbVersion)) if err != nil { @@ -249,7 +249,7 @@ func updateServiceSafePoint(ctx context.Context, pdClient pd.Client, snapshotTS } } -func parseSnapshotToTSO(pool *sql.DB, snapshot string) (uint64, error) { +func ParseSnapshotToTSO(pool *sql.DB, snapshot string) (uint64, error) { snapshotTS, err := strconv.ParseUint(snapshot, 10, 64) if err == nil { return snapshotTS, nil