Skip to content

Commit

Permalink
fix: solve split brain problem; fix bug in genCountSQL
Browse files Browse the repository at this point in the history
Signed-off-by: newborn22 <[email protected]>
  • Loading branch information
newborn22 committed Jan 10, 2024
1 parent ac7255a commit 5112127
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 11 deletions.
52 changes: 43 additions & 9 deletions go/vt/vttablet/jobcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,10 @@ func (jc *JobController) jobScheduler(checkBeforeSchedule chan struct{}) {
defer timer.Stop()

for {
// 防止vttablet不再是primary时该协程继续执行
if jc.tabletTypeFunc() != topodatapb.TabletType_PRIMARY {
return
}
select {
case <-timer.C:
case <-jc.schedulerNotifyChan:
Expand Down Expand Up @@ -728,8 +732,7 @@ func (jc *JobController) execBatchAndRecord(ctx context.Context, tableSchema, ta
}

// 2.查询batch sql预计影响的行数,如果超过阈值,则生成新的batch ID
batchCountSQLForShare := batchCountSQL + " FOR SHARE"
// todo 检查batch status是否为completed,脑裂问题
batchCountSQLForShare := batchCountSQL + " FOR UPDATE"
qr, err := conn.Exec(ctx, batchCountSQLForShare, math.MaxInt32, true)
if err != nil {
return err
Expand All @@ -738,6 +741,34 @@ func (jc *JobController) execBatchAndRecord(ctx context.Context, tableSchema, ta
return errors.New("the len of qr of count expected batch size is not 1")
}
expectedRow, _ := qr.Named().Rows[0].ToInt64("count_rows")

// 检查batch status是否为completed,防止vttablet脑裂问题导致一个batch被多次执行
sqlGetBatchStatus := fmt.Sprintf(sqlTemplateGetBatchStatus, batchTable)
queryGetBatchStatus, err := sqlparser.ParseAndBind(sqlGetBatchStatus, sqltypes.StringBindVariable(batchID))
if err != nil {
return err
}
qr, err = conn.Exec(ctx, queryGetBatchStatus, math.MaxInt32, true)
if err != nil {
return err
}
if len(qr.Named().Rows) != 1 {
return errors.New("the len of qr of count expected batch size is not 1")
}
batchStatus, _ := qr.Named().Rows[0].ToString("batch_status")
if batchStatus == completedStatus {
return nil
}

// 将batchID信息记录在系统表中便于用户查看
queryUpdateDealingBatchID, err := sqlparser.ParseAndBind(sqlUpdateDealingBatchID,
sqltypes.StringBindVariable(batchID),
sqltypes.StringBindVariable(uuid))
_, err = conn.Exec(ctx, queryUpdateDealingBatchID, math.MaxInt32, false)
if err != nil {
return err
}

//batchSize = 30
if expectedRow > batchSize {
batchSQL, err = jc.splitBatchIntoTwo(ctx, tableSchema, table, batchTable, batchSQL, batchCountSQL, batchID, conn, batchSize, expectedRow)
Expand Down Expand Up @@ -930,6 +961,11 @@ func (jc *JobController) dmlJobBatchRunner(uuid, table, tableSchema, batchTable,

// 在一个无限循环中等待定时器触发
for range timer.C {
// 防止vttablet不再是primary时该协程继续执行
if jc.tabletTypeFunc() != topodatapb.TabletType_PRIMARY {
return
}

// 定时器触发时执行的函数
// 检查状态是否为running,可能为paused/canceled
status, err := jc.GetStrJobInfo(ctx, uuid, "status")
Expand All @@ -942,6 +978,7 @@ func (jc *JobController) dmlJobBatchRunner(uuid, table, tableSchema, batchTable,
}

// 检查是否在运维窗口内
// todo,增加时区支持,以及是否可能由于脑裂问题导致错误fail掉job?
if timePeriodStart != nil && timePeriodEnd != nil {
currentTime := time.Now()
if !(currentTime.After(*timePeriodStart) && currentTime.Before(*timePeriodEnd)) {
Expand Down Expand Up @@ -973,13 +1010,6 @@ func (jc *JobController) dmlJobBatchRunner(uuid, table, tableSchema, batchTable,
return
}

// 将batchID信息记录在系统表中便于用户查看
err = jc.updateDealingBatchID(ctx, uuid, batchIDToExec)
if err != nil {
jc.FailJob(ctx, uuid, err.Error(), table)
return
}

batchSQL, batchCountSQL, err := jc.getBatchSQLsByID(ctx, batchIDToExec, batchTable, tableSchema)
if err != nil {
jc.FailJob(ctx, uuid, err.Error(), table)
Expand Down Expand Up @@ -1303,6 +1333,10 @@ func (jc *JobController) jobHealthCheck(checkBeforeSchedule chan struct{}) {
defer timer.Stop()

for range timer.C {
// 防止vttablet不再是primary时该协程继续执行
if jc.tabletTypeFunc() != topodatapb.TabletType_PRIMARY {
return
}
// todo, 增加对长时间未增加 rows的处理
jc.tableMutex.Lock()
qr, _ := jc.execQuery(ctx, "", sqlDMLJobGetAllJobs)
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/jobcontroller/sql_related.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,17 @@ func genBatchSQL(sql string, stmt sqlparser.Statement, whereExpr sqlparser.Expr,
// 1.在sql中可以正确地使用between或>=,<=进行比较运算,且没有精度问题。
// 2.可以转换成go中的int64,float64或string三种类型之一,且转换后,在golang中的比较规则和mysql中的比较规则相同
func genCountSQL(tableSchema, tableName, whereExpr string) (countSQL string) {
countSQL = fmt.Sprintf("select count(*) as count_rows from %s.%s where %s)",
countSQL = fmt.Sprintf("select count(*) as count_rows from %s.%s where %s",
tableSchema, tableName, whereExpr)
return countSQL
}

func genBatchStartAndEndStr(currentBatchStart, currentBatchEnd []sqltypes.Value) (currentBatchStartStr string, currentBatchStartEnd string, err error) {
prefix := ""
for i := range currentBatchStart {
prefix = ","
currentBatchStartStr += prefix + currentBatchStart[i].ToString()
currentBatchStartEnd += prefix + currentBatchEnd[i].ToString()
prefix = ","
}
return currentBatchStartStr, currentBatchStartEnd, nil
}
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/jobcontroller/sqls.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ const (

sqlTemplateGetBatchIDToExec = `SELECT batch_id FROM %s where batch_status = 'queued' order by CAST(SUBSTRING_INDEX(batch_id, '-', 1) AS SIGNED),id limit 1`

sqlTemplateGetBatchStatus = `SELECT batch_status FROM %s where batch_id=%%a`

sqlTemplateInsertBatchEntry = ` insert into %s (
batch_id,
batch_sql,
Expand Down

0 comments on commit 5112127

Please sign in to comment.