Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Commit

Permalink
dump: fix dump TiDB snapshot inconsistent problem (#357)
Browse files Browse the repository at this point in the history
  • Loading branch information
lichunzhu authored Oct 9, 2021
1 parent e13bfc2 commit 562cb6c
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 9 deletions.
9 changes: 5 additions & 4 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ func (d *Dumper) Dump() (dumpErr error) {
tctx, conf, pool := d.tctx, d.conf, d.dbHandle
tctx.L().Info("begin to run Dump", zap.Stringer("conf", conf))
m := newGlobalMetadata(tctx, d.extStore, conf.Snapshot)
repeatableRead := needRepeatableRead(conf.ServerInfo.ServerType, conf.Consistency)
defer func() {
if dumpErr == nil {
_ = m.writeGlobalMetaData()
Expand All @@ -100,7 +101,7 @@ func (d *Dumper) Dump() (dumpErr error) {

// for consistency lock, we should get table list at first to generate the lock tables SQL
if conf.Consistency == consistencyTypeLock {
conn, err = createConnWithConsistency(tctx, pool)
conn, err = createConnWithConsistency(tctx, pool, repeatableRead)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -126,7 +127,7 @@ func (d *Dumper) Dump() (dumpErr error) {
}
}()

metaConn, err := createConnWithConsistency(tctx, pool)
metaConn, err := createConnWithConsistency(tctx, pool, repeatableRead)
if err != nil {
return err
}
Expand Down Expand Up @@ -161,7 +162,7 @@ func (d *Dumper) Dump() (dumpErr error) {
}
// give up the last broken connection
conn.Close()
newConn, err1 := createConnWithConsistency(tctx, pool)
newConn, err1 := createConnWithConsistency(tctx, pool, repeatableRead)
if err1 != nil {
return conn, errors.Trace(err1)
}
Expand Down Expand Up @@ -251,7 +252,7 @@ func (d *Dumper) startWriters(tctx *tcontext.Context, wg *errgroup.Group, taskCh
conf, pool := d.conf, d.dbHandle
writers := make([]*Writer, conf.Threads)
for i := 0; i < conf.Threads; i++ {
conn, err := createConnWithConsistency(tctx, pool)
conn, err := createConnWithConsistency(tctx, pool, needRepeatableRead(conf.ServerInfo.ServerType, conf.Consistency))
if err != nil {
return nil, func() {}, err
}
Expand Down
13 changes: 8 additions & 5 deletions v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -680,15 +680,18 @@ func resetDBWithSessionParams(tctx *tcontext.Context, db *sql.DB, dsn string, pa
return newDB, errors.Trace(err)
}

func createConnWithConsistency(ctx context.Context, db *sql.DB) (*sql.Conn, error) {
func createConnWithConsistency(ctx context.Context, db *sql.DB, repeatableRead bool) (*sql.Conn, error) {
conn, err := db.Conn(ctx)
if err != nil {
return nil, errors.Trace(err)
}
query := "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ"
_, err = conn.ExecContext(ctx, query)
if err != nil {
return nil, errors.Annotatef(err, "sql: %s", query)
var query string
if repeatableRead {
query = "SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ"
_, err = conn.ExecContext(ctx, query)
if err != nil {
return nil, errors.Annotatef(err, "sql: %s", query)
}
}
query = "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */"
_, err = conn.ExecContext(ctx, query)
Expand Down
4 changes: 4 additions & 0 deletions v4/export/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,7 @@ func string2Map(a, b []string) map[string]string {
}
return a2b
}

func needRepeatableRead(serverType ServerType, consistency string) bool {
return consistency != consistencyTypeSnapshot || serverType != ServerTypeTiDB
}
32 changes: 32 additions & 0 deletions v4/export/util_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2021 PingCAP, Inc. Licensed under Apache-2.0.

package export

import (
"fmt"
"testing"

"github.com/stretchr/testify/require"
)

func TestRepeatableRead(t *testing.T) {
t.Parallel()

data := [][]interface{}{
{ServerTypeUnknown, consistencyTypeNone, true},
{ServerTypeMySQL, consistencyTypeFlush, true},
{ServerTypeMariaDB, consistencyTypeLock, true},
{ServerTypeTiDB, consistencyTypeNone, true},
{ServerTypeTiDB, consistencyTypeSnapshot, false},
{ServerTypeTiDB, consistencyTypeLock, true},
}
dec := func(d []interface{}) (ServerType, string, bool) {
return ServerType(d[0].(int)), d[1].(string), d[2].(bool)
}
for tag, datum := range data {
serverTp, consistency, expectRepeatableRead := dec(datum)
comment := fmt.Sprintf("test case number: %d", tag)
rr := needRepeatableRead(serverTp, consistency)
require.True(t, rr == expectRepeatableRead, comment)
}
}

0 comments on commit 562cb6c

Please sign in to comment.