From 1d8aefff56f4988db2932470e4d14bc45a5c5b66 Mon Sep 17 00:00:00 2001 From: newborn22 <953950914@qq.com> Date: Mon, 11 Dec 2023 18:19:09 +0800 Subject: [PATCH] feat: support postpone and cancel Signed-off-by: newborn22 <953950914@qq.com> --- go/vt/vttablet/jobcontroller/controller.go | 145 +++++++++++------- go/vt/vttablet/tabletserver/query_executor.go | 4 + 2 files changed, 97 insertions(+), 52 deletions(-) diff --git a/go/vt/vttablet/jobcontroller/controller.go b/go/vt/vttablet/jobcontroller/controller.go index 6603688370..c7d8425058 100644 --- a/go/vt/vttablet/jobcontroller/controller.go +++ b/go/vt/vttablet/jobcontroller/controller.go @@ -47,7 +47,7 @@ const ( ThrottleAllJobs = "throttle_all" UnthrottleJob = "unthrottle" UnthrottleAllJobs = "unthrottle_all" - CancelJob = "cancel_job" + CancelJob = "cancel" ) const ( @@ -354,12 +354,37 @@ func (jc *JobController) CancelJob(uuid string) (*sqltypes.Result, error) { if err != nil { return emptyResult, nil } - if status == canceledStatus || status == failedStatus { + if status == canceledStatus || status == failedStatus || status == completedStatus { emptyResult.Info = fmt.Sprintf(" The job status is %s and can't canceld", status) return emptyResult, nil } - return &sqltypes.Result{}, nil + qr, err := jc.updateJobStatus(ctx, uuid, canceledStatus) + if err != nil { + return emptyResult, nil + } + + // 对于paused和running这两个状态的job,它们在内存中都有一个jobRunner协程正在运行,需要将协程关闭 + if status == runningStatus || status == pausedStatus { + jc.jobChansMutex.Lock() + cancelChan := jc.jobChans[uuid].cancel + jc.jobChansMutex.Unlock() + + // 由于chan的容量为1,会阻塞在这里,直到jobRunner中接收了此信号 + cancelChan <- "cancel" + } + + tableName, _ := jc.GetStrJobInfo(ctx, uuid, "related_table") + + jc.jobChansMutex.Lock() + jc.workingUUIDsMutex.Lock() + jc.workingTablesMutex.Lock() + jc.deleteDMLJobRunningMeta(uuid, tableName) + jc.jobChansMutex.Unlock() + jc.workingUUIDsMutex.Unlock() + jc.workingTablesMutex.Unlock() + + return qr, nil } func (jc *JobController) CompleteJob(ctx context.Context, uuid, table string) (*sqltypes.Result, error) { @@ -367,6 +392,10 @@ func (jc *JobController) CompleteJob(ctx context.Context, uuid, table string) (* defer jc.workingTablesMutex.Unlock() delete(jc.workingTables, table) + jc.workingUUIDsMutex.Lock() + defer jc.workingUUIDsMutex.Unlock() + delete(jc.workingUUIDs, uuid) + jc.jobChansMutex.Lock() defer jc.jobChansMutex.Unlock() close(jc.jobChans[uuid].pauseAndResume) @@ -420,10 +449,11 @@ func (jc *JobController) jobScheduler(checkBeforeSchedule chan struct{}) { subtaskSQL := row["subtask_sql"].ToString() dmlType := row["dml_type"].ToString() countTotalRows, _ := row["count_total_rows"].ToInt64() + subtaskRows, _ := row["subtask_rows"].ToInt64() if jc.checkDmlJobRunnable(status, table) { // todo 这里之后改成休眠的方式后要删掉, 由于外面拿锁,必须在这里就加上,不然后面的循环可能:已经启动go runner的但是还未加入到working table,导致多个表的同时启动 jc.initDMLJobRunningMeta(uuid, table) - go jc.dmlJobRunner(uuid, table, schema, subtaskSQL, dmlType, timegap, countTotalRows, 0, true) + go jc.dmlJobRunner(uuid, table, schema, subtaskSQL, dmlType, timegap, countTotalRows, 0, subtaskRows, true) } } @@ -447,13 +477,14 @@ func (jc *JobController) checkDmlJobRunnable(status, table string) bool { return true } -func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dmlType string, timeGap, countTotalRows, offset int64, updateStatusRunning bool) { +func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dmlType string, timeGap, countTotalRows, offset, subtaskRows int64, updateStatusRunning bool) { jc.jobChansMutex.Lock() jobChan := jc.jobChans[uuid] jc.jobChansMutex.Unlock() pauseAndResumeChan := jobChan.pauseAndResume + cancelChan := jobChan.cancel // timeGap 单位ms,duration输入ns,应该乘上1000000 timer := time.NewTicker(time.Duration(timeGap * 1e6)) @@ -471,60 +502,69 @@ func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dm // 在一个无限循环中等待定时器触发 for { + // 第一层select用于接收是否有用户的cancel命令,以随时结束协程 select { - case <-timer.C: - // 定时器触发时执行的函数 - // 获得sql,分update和delete两种情况 - var query string - if dmlType == "update" { - query, err = sqlparser.ParseAndBind(subtaskSQL, sqltypes.Int64BindVariable(offset)) - if err != nil { - jc.FailJob(ctx, uuid, err.Error(), table) - } - } - if dmlType == "delete" { - query = subtaskSQL - } - - // todo newborn22,删除,旧方案 有bug - //qr, err := jc.execQuery(ctx, relatedSchema, query) - affectedRows, err := jc.execSubtaskAndRecord(ctx, relatedSchema, query, uuid) - - if err != nil { - jc.FailJob(ctx, uuid, err.Error(), table) + case cmd := <-cancelChan: + if cmd == "cancel" { + _ = jc.updateJobMessage(ctx, uuid, fmt.Sprintf("Canceld by user at %s", time.Now().Format("2006-01-02 15:04:05"))) return } + default: + select { + case <-timer.C: + // 定时器触发时执行的函数 + // 获得sql,分update和delete两种情况 + var query string + if dmlType == "update" { + query, err = sqlparser.ParseAndBind(subtaskSQL, sqltypes.Int64BindVariable(offset)) + if err != nil { + jc.FailJob(ctx, uuid, err.Error(), table) + } + } + if dmlType == "delete" { + query = subtaskSQL + } + + // todo newborn22,删除,旧方案 有bug + //qr, err := jc.execQuery(ctx, relatedSchema, query) + affectedRows, err := jc.execSubtaskAndRecord(ctx, relatedSchema, query, uuid) - // complete,分update和delete两种情况 - // todo newborn22,删除,旧方案 有bug - //if (dmlType == "delete" && qr.RowsAffected == 0) || (dmlType == "update" && offset >= countTotalRows) { - if (dmlType == "delete" && affectedRows == 0) || (dmlType == "update" && offset >= countTotalRows) { - _, err = jc.CompleteJob(ctx, uuid, table) if err != nil { jc.FailJob(ctx, uuid, err.Error(), table) + return } - return - } - // todo newborn22,删除,旧方案 有bug - //err = jc.updateJobAffectedRows(ctx, uuid, int64(qr.RowsAffected)) - //if err != nil { - // jc.FailJob(ctx, uuid, err.Error(), table) - // return - //} - if dmlType == "update" { - //offset += int64(qr.RowsAffected) - offset += affectedRows - } + // complete,分update和delete两种情况 + // todo newborn22,删除,旧方案 有bug + //if (dmlType == "delete" && qr.RowsAffected == 0) || (dmlType == "update" && offset >= countTotalRows) { + if (dmlType == "delete" && affectedRows == 0) || (dmlType == "update" && offset >= countTotalRows) { + _, err = jc.CompleteJob(ctx, uuid, table) + if err != nil { + jc.FailJob(ctx, uuid, err.Error(), table) + } + return + } + + // todo newborn22,删除,旧方案 有bug + //err = jc.updateJobAffectedRows(ctx, uuid, int64(qr.RowsAffected)) + //if err != nil { + // jc.FailJob(ctx, uuid, err.Error(), table) + // return + //} + if dmlType == "update" { + //offset += int64(qr.RowsAffected) + offset += subtaskRows + } - // 控制暂停 - case command := <-pauseAndResumeChan: - switch command { - case "pause": - for { - cmd := <-pauseAndResumeChan - if cmd == "resume" { // actually, cmd will always be "resume", the code logic will guarantee that - break + // 控制暂停 + case command := <-pauseAndResumeChan: + switch command { + case "pause": + for { + cmd := <-pauseAndResumeChan + if cmd == "resume" { // actually, cmd will always be "resume", the code logic will guarantee that + break + } } } } @@ -943,10 +983,11 @@ func (jc *JobController) jobHealthCheck(checkBeforeSchedule chan struct{}) { dmlType := row["dml_type"].ToString() countTotalRows, _ := row["count_total_rows"].ToInt64() AffectedRows, _ := row["affected_rows"].ToInt64() + subtaskRows, _ := row["subtask_rows"].ToInt64() if status == runningStatus { jc.initDMLJobRunningMeta(uuid, table) - go jc.dmlJobRunner(uuid, table, tableSchema, subtaskSQL, dmlType, timegap, countTotalRows, AffectedRows, false) + go jc.dmlJobRunner(uuid, table, tableSchema, subtaskSQL, dmlType, timegap, countTotalRows, AffectedRows, subtaskRows, false) } if status == pausedStatus { jc.initDMLJobRunningMeta(uuid, table) @@ -955,7 +996,7 @@ func (jc *JobController) jobHealthCheck(checkBeforeSchedule chan struct{}) { pauseChan := jc.jobChans[uuid].pauseAndResume jc.jobChansMutex.Unlock() pauseChan <- "pause" - go jc.dmlJobRunner(uuid, table, tableSchema, subtaskSQL, dmlType, timegap, countTotalRows, AffectedRows, false) + go jc.dmlJobRunner(uuid, table, tableSchema, subtaskSQL, dmlType, timegap, countTotalRows, AffectedRows, subtaskRows, false) } } diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 1eb62ae8f1..2f6576557f 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -1164,6 +1164,10 @@ func (qre *QueryExecutor) execAlterDMLJob() (*sqltypes.Result, error) { return qre.tsv.dmlJonController.HandleRequest("pause", "", uuid, "", 0, 0, false, false) case sqlparser.ResumeDMLJobType: return qre.tsv.dmlJonController.HandleRequest("resume", "", uuid, "", 0, 0, false, false) + case sqlparser.LaunchDMLJobType: + return qre.tsv.dmlJonController.HandleRequest("launch", "", uuid, "", 0, 0, false, false) + case sqlparser.CancelDMLJobType: + return qre.tsv.dmlJonController.HandleRequest("cancel", "", uuid, "", 0, 0, false, false) } return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ALTER DML_JOB not implemented") }