From 5112127d06db66fa3b8beadd09c6d1df8d45d801 Mon Sep 17 00:00:00 2001 From: newborn22 <953950914@qq.com> Date: Tue, 9 Jan 2024 12:40:10 +0800 Subject: [PATCH] fix: solve split brain problem; fix bug in genCountSQL Signed-off-by: newborn22 <953950914@qq.com> --- go/vt/vttablet/jobcontroller/controller.go | 52 +++++++++++++++++---- go/vt/vttablet/jobcontroller/sql_related.go | 4 +- go/vt/vttablet/jobcontroller/sqls.go | 2 + 3 files changed, 47 insertions(+), 11 deletions(-) diff --git a/go/vt/vttablet/jobcontroller/controller.go b/go/vt/vttablet/jobcontroller/controller.go index db606580a8..b340a6a7af 100644 --- a/go/vt/vttablet/jobcontroller/controller.go +++ b/go/vt/vttablet/jobcontroller/controller.go @@ -556,6 +556,10 @@ func (jc *JobController) jobScheduler(checkBeforeSchedule chan struct{}) { defer timer.Stop() for { + // 防止vttablet不再是primary时该协程继续执行 + if jc.tabletTypeFunc() != topodatapb.TabletType_PRIMARY { + return + } select { case <-timer.C: case <-jc.schedulerNotifyChan: @@ -728,8 +732,7 @@ func (jc *JobController) execBatchAndRecord(ctx context.Context, tableSchema, ta } // 2.查询batch sql预计影响的行数,如果超过阈值,则生成新的batch ID - batchCountSQLForShare := batchCountSQL + " FOR SHARE" - // todo 检查batch status是否为completed,脑裂问题 + batchCountSQLForShare := batchCountSQL + " FOR UPDATE" qr, err := conn.Exec(ctx, batchCountSQLForShare, math.MaxInt32, true) if err != nil { return err @@ -738,6 +741,34 @@ func (jc *JobController) execBatchAndRecord(ctx context.Context, tableSchema, ta return errors.New("the len of qr of count expected batch size is not 1") } expectedRow, _ := qr.Named().Rows[0].ToInt64("count_rows") + + // 检查batch status是否为completed,防止vttablet脑裂问题导致一个batch被多次执行 + sqlGetBatchStatus := fmt.Sprintf(sqlTemplateGetBatchStatus, batchTable) + queryGetBatchStatus, err := sqlparser.ParseAndBind(sqlGetBatchStatus, sqltypes.StringBindVariable(batchID)) + if err != nil { + return err + } + qr, err = conn.Exec(ctx, queryGetBatchStatus, math.MaxInt32, true) + if err != nil { + return err + } + if len(qr.Named().Rows) != 1 { + return errors.New("the len of qr of count expected batch size is not 1") + } + batchStatus, _ := qr.Named().Rows[0].ToString("batch_status") + if batchStatus == completedStatus { + return nil + } + + // 将batchID信息记录在系统表中便于用户查看 + queryUpdateDealingBatchID, err := sqlparser.ParseAndBind(sqlUpdateDealingBatchID, + sqltypes.StringBindVariable(batchID), + sqltypes.StringBindVariable(uuid)) + _, err = conn.Exec(ctx, queryUpdateDealingBatchID, math.MaxInt32, false) + if err != nil { + return err + } + //batchSize = 30 if expectedRow > batchSize { batchSQL, err = jc.splitBatchIntoTwo(ctx, tableSchema, table, batchTable, batchSQL, batchCountSQL, batchID, conn, batchSize, expectedRow) @@ -930,6 +961,11 @@ func (jc *JobController) dmlJobBatchRunner(uuid, table, tableSchema, batchTable, // 在一个无限循环中等待定时器触发 for range timer.C { + // 防止vttablet不再是primary时该协程继续执行 + if jc.tabletTypeFunc() != topodatapb.TabletType_PRIMARY { + return + } + // 定时器触发时执行的函数 // 检查状态是否为running,可能为paused/canceled status, err := jc.GetStrJobInfo(ctx, uuid, "status") @@ -942,6 +978,7 @@ func (jc *JobController) dmlJobBatchRunner(uuid, table, tableSchema, batchTable, } // 检查是否在运维窗口内 + // todo,增加时区支持,以及是否可能由于脑裂问题导致错误fail掉job? if timePeriodStart != nil && timePeriodEnd != nil { currentTime := time.Now() if !(currentTime.After(*timePeriodStart) && currentTime.Before(*timePeriodEnd)) { @@ -973,13 +1010,6 @@ func (jc *JobController) dmlJobBatchRunner(uuid, table, tableSchema, batchTable, return } - // 将batchID信息记录在系统表中便于用户查看 - err = jc.updateDealingBatchID(ctx, uuid, batchIDToExec) - if err != nil { - jc.FailJob(ctx, uuid, err.Error(), table) - return - } - batchSQL, batchCountSQL, err := jc.getBatchSQLsByID(ctx, batchIDToExec, batchTable, tableSchema) if err != nil { jc.FailJob(ctx, uuid, err.Error(), table) @@ -1303,6 +1333,10 @@ func (jc *JobController) jobHealthCheck(checkBeforeSchedule chan struct{}) { defer timer.Stop() for range timer.C { + // 防止vttablet不再是primary时该协程继续执行 + if jc.tabletTypeFunc() != topodatapb.TabletType_PRIMARY { + return + } // todo, 增加对长时间未增加 rows的处理 jc.tableMutex.Lock() qr, _ := jc.execQuery(ctx, "", sqlDMLJobGetAllJobs) diff --git a/go/vt/vttablet/jobcontroller/sql_related.go b/go/vt/vttablet/jobcontroller/sql_related.go index e56926d5a4..c39847a254 100644 --- a/go/vt/vttablet/jobcontroller/sql_related.go +++ b/go/vt/vttablet/jobcontroller/sql_related.go @@ -111,7 +111,7 @@ func genBatchSQL(sql string, stmt sqlparser.Statement, whereExpr sqlparser.Expr, // 1.在sql中可以正确地使用between或>=,<=进行比较运算,且没有精度问题。 // 2.可以转换成go中的int64,float64或string三种类型之一,且转换后,在golang中的比较规则和mysql中的比较规则相同 func genCountSQL(tableSchema, tableName, whereExpr string) (countSQL string) { - countSQL = fmt.Sprintf("select count(*) as count_rows from %s.%s where %s)", + countSQL = fmt.Sprintf("select count(*) as count_rows from %s.%s where %s", tableSchema, tableName, whereExpr) return countSQL } @@ -119,9 +119,9 @@ func genCountSQL(tableSchema, tableName, whereExpr string) (countSQL string) { func genBatchStartAndEndStr(currentBatchStart, currentBatchEnd []sqltypes.Value) (currentBatchStartStr string, currentBatchStartEnd string, err error) { prefix := "" for i := range currentBatchStart { - prefix = "," currentBatchStartStr += prefix + currentBatchStart[i].ToString() currentBatchStartEnd += prefix + currentBatchEnd[i].ToString() + prefix = "," } return currentBatchStartStr, currentBatchStartEnd, nil } diff --git a/go/vt/vttablet/jobcontroller/sqls.go b/go/vt/vttablet/jobcontroller/sqls.go index 7412b98fcd..42da7b3686 100644 --- a/go/vt/vttablet/jobcontroller/sqls.go +++ b/go/vt/vttablet/jobcontroller/sqls.go @@ -112,6 +112,8 @@ const ( sqlTemplateGetBatchIDToExec = `SELECT batch_id FROM %s where batch_status = 'queued' order by CAST(SUBSTRING_INDEX(batch_id, '-', 1) AS SIGNED),id limit 1` + sqlTemplateGetBatchStatus = `SELECT batch_status FROM %s where batch_id=%%a` + sqlTemplateInsertBatchEntry = ` insert into %s ( batch_id, batch_sql,