Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
syncer: don't skip jobs from same event when comparing table checkpoi…
Browse files Browse the repository at this point in the history
…nt (#1752) (#1783)
  • Loading branch information
lance6716 authored Jun 21, 2021
1 parent 700e447 commit 176f8e2
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 25 deletions.
20 changes: 13 additions & 7 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ type CheckPoint interface {
// DeleteTablePoint deletes checkpoint for specified table in memory and storage
DeleteTablePoint(tctx *tcontext.Context, sourceSchema, sourceTable string) error

// IsNewerTablePoint checks whether job's checkpoint is newer than previous saved checkpoint
IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool
// IsOlderThanTablePoint checks whether job's checkpoint is older than previous saved checkpoint
IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool

// SaveGlobalPoint saves the global binlog stream's checkpoint
// corresponding to Meta.Save
Expand Down Expand Up @@ -325,20 +325,26 @@ func (cp *RemoteCheckPoint) DeleteTablePoint(tctx *tcontext.Context, sourceSchem
return nil
}

// IsNewerTablePoint implements CheckPoint.IsNewerTablePoint
func (cp *RemoteCheckPoint) IsNewerTablePoint(sourceSchema, sourceTable string, pos mysql.Position) bool {
// IsOlderThanTablePoint implements CheckPoint.IsOlderThanTablePoint.
// For binlog position replication, currently DM will split rows changes of an event to jobs, so some job may has save position.
// if useLE is true, we use less than or equal.
func (cp *RemoteCheckPoint) IsOlderThanTablePoint(sourceSchema, sourceTable string, pos mysql.Position, useLE bool) bool {
cp.RLock()
defer cp.RUnlock()
mSchema, ok := cp.points[sourceSchema]
if !ok {
return true
return false
}
point, ok := mSchema[sourceTable]
if !ok {
return true
return false
}
oldPos := point.MySQLPos()
return pos.Compare(oldPos) > 0
cp.logCtx.L().Debug("compare table position whether is newer", zap.Stringer("position", pos), zap.Stringer("old position", oldPos))
if useLE {
return pos.Compare(oldPos) <= 0
}
return pos.Compare(oldPos) < 0
}

// SaveGlobalPoint implements CheckPoint.SaveGlobalPoint
Expand Down
32 changes: 16 additions & 16 deletions syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,23 +255,23 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
)

// not exist
newer := cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsTrue)
older := cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsFalse)

// save
cp.SaveTablePoint(schema, table, pos2)
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsTrue)

// rollback, to min
cp.Rollback()
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsTrue)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsFalse)

// save again
cp.SaveTablePoint(schema, table, pos2)
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsTrue)

// flush + rollback
s.mock.ExpectBegin()
Expand All @@ -280,22 +280,22 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
err = cp.FlushPointsExcept(tctx, nil, nil, nil)
c.Assert(err, IsNil)
cp.Rollback()
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsTrue)

// clear, to min
s.mock.ExpectBegin()
s.mock.ExpectExec(clearCheckPointSQL).WillReturnResult(sqlmock.NewResult(0, 1))
s.mock.ExpectCommit()
err = cp.Clear(tctx)
c.Assert(err, IsNil)
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsTrue)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsFalse)

// save
cp.SaveTablePoint(schema, table, pos2)
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsFalse)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsTrue)

// test save table point less than global point
func() {
Expand All @@ -316,6 +316,6 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
err = cp.FlushPointsExcept(tctx, [][]string{{schema, table}}, nil, nil)
c.Assert(err, IsNil)
cp.Rollback()
newer = cp.IsNewerTablePoint(schema, table, pos1)
c.Assert(newer, IsTrue)
older = cp.IsOlderThanTablePoint(schema, table, pos1, false)
c.Assert(older, IsFalse)
}
46 changes: 44 additions & 2 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package syncer

import (
"context"
"errors"
"fmt"
"math"
"reflect"
Expand Down Expand Up @@ -737,7 +738,34 @@ func (s *Syncer) checkWait(job *job) bool {
return false
}

// only used in tests.
var (
lastPos mysql.Position
lastPosNum int
waitJobsDone bool
failExecuteSQL bool
failOnce sync2.AtomicInt64
)

func (s *Syncer) addJob(job *job) error {
failpoint.Inject("countJobFromOneEvent", func() {
if job.currentPos.Compare(lastPos) == 0 {
lastPosNum++
} else {
lastPos = job.currentPos
lastPosNum = 1
}
// trigger a flush after see one job
if lastPosNum == 1 {
waitJobsDone = true
s.tctx.L().Info("meet the first job of an event", zap.Any("binlog position", lastPos))
}
// mock a execution error after see two jobs.
if lastPosNum == 2 {
failExecuteSQL = true
s.tctx.L().Info("meet the second job of an event", zap.Any("binlog position", lastPos))
}
})
var (
queueBucket int
execDDLReq *pb.ExecDDLRequest
Expand Down Expand Up @@ -788,6 +816,13 @@ func (s *Syncer) addJob(job *job) error {
}

wait := s.checkWait(job)
failpoint.Inject("flushFirstJobOfEvent", func() {
if waitJobsDone {
s.tctx.L().Info("trigger flushFirstJobOfEvent")
waitJobsDone = false
wait = true
}
})
if wait {
s.jobWg.Wait()
s.c.reset()
Expand Down Expand Up @@ -987,6 +1022,13 @@ func (s *Syncer) sync(tctx *tcontext.Context, queueBucket string, db *DBConn, jo
if len(jobs) == 0 {
return nil
}

failpoint.Inject("failSecondJobOfEvent", func() {
if failExecuteSQL && failOnce.CompareAndSwap(0, 1) {
s.tctx.L().Info("trigger failSecondJobOfEvent")
failpoint.Return(errors.New("failSecondJobOfEvent"))
}
})
queries := make([]string, 0, len(jobs))
args := make([][]interface{}, 0, len(jobs))
for _, j := range jobs {
Expand Down Expand Up @@ -1440,7 +1482,7 @@ func (s *Syncer) handleRowsEvent(ev *replication.RowsEvent, ec eventContext) err
}

// DML position before table checkpoint, ignore it
if !s.checkpoint.IsNewerTablePoint(originSchema, originTable, *ec.currentPos) {
if s.checkpoint.IsOlderThanTablePoint(originSchema, originTable, *ec.currentPos, false) {
s.tctx.L().Debug("ignore obsolete event that is old than table checkpoint", zap.String("event", "row"), log.WrapStringerField("position", ec.currentPos), zap.String("origin schema", originSchema), zap.String("origin table", originTable))
return nil
}
Expand Down Expand Up @@ -1670,7 +1712,7 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext) e

// for DDL, we wait it to be executed, so we can check if event is newer in this syncer's main process goroutine
// ignore obsolete DDL here can avoid to try-sync again for already synced DDLs
if !s.checkpoint.IsNewerTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentPos) {
if s.checkpoint.IsOlderThanTablePoint(tableNames[0][0].Schema, tableNames[0][0].Name, *ec.currentPos, true) {
s.tctx.L().Info("ignore obsolete DDL", zap.String("event", "query"), zap.String("statement", sql), log.WrapStringerField("position", ec.currentPos))
continue
}
Expand Down
3 changes: 3 additions & 0 deletions tests/all_mode/data/db1.increment3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use all_mode;
insert into t1 (id, name) values (10, '10'), (20, '20');
insert into t1 (id, name) values (30, '30');
3 changes: 3 additions & 0 deletions tests/all_mode/data/db2.increment3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
use all_mode;
insert into t2 (id, name) values (10, '10'), (20, '20');
insert into t2 (id, name) values (30, '30');
44 changes: 44 additions & 0 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,51 @@ function test_session_config(){
echo "[$(date)] <<<<<< finish test_session_config >>>>>>"
}

function test_fail_job_between_event() {
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'

# start DM worker and master
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT

inject_points=(
"github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"
"github.com/pingcap/dm/syncer/countJobFromOneEvent=return()"
"github.com/pingcap/dm/syncer/flushFirstJobOfEvent=return()"
"github.com/pingcap/dm/syncer/failSecondJobOfEvent=return()"
)
export GO_FAILPOINTS="$(join_string \; ${inject_points[@]})"
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

cp $cur/conf/dm-task.yaml $WORK_DIR/dm-task.yaml
sed -i 's/sql_mode: ".*"/sql_mode: "NO_AUTO_VALUE_ON_ZERO"/g' $WORK_DIR/dm-task.yaml
dmctl_start_task "$WORK_DIR/dm-task.yaml"

run_sql_file $cur/data/db1.increment3.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment3.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
sleep 2
check_log_contains $WORK_DIR/worker1/log/dm-worker.log "failSecondJobOfEvent"
check_log_contains $WORK_DIR/worker2/log/dm-worker.log "failSecondJobOfEvent"
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"result\": true" 3
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

cleanup_data all_mode
cleanup_process $*

export GO_FAILPOINTS=''
}

function run() {
test_fail_job_between_event

test_session_config

export GO_FAILPOINTS="github.com/pingcap/dm/dm/worker/TaskCheckInterval=return(\"500ms\")"
Expand Down

0 comments on commit 176f8e2

Please sign in to comment.