Skip to content

Commit

Permalink
feat: support postpone and cancel
Browse files Browse the repository at this point in the history
Signed-off-by: newborn22 <[email protected]>
  • Loading branch information
newborn22 committed Dec 12, 2023
1 parent 0330fdf commit 1d8aeff
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 52 deletions.
145 changes: 93 additions & 52 deletions go/vt/vttablet/jobcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const (
ThrottleAllJobs = "throttle_all"
UnthrottleJob = "unthrottle"
UnthrottleAllJobs = "unthrottle_all"
CancelJob = "cancel_job"
CancelJob = "cancel"
)

const (
Expand Down Expand Up @@ -354,19 +354,48 @@ func (jc *JobController) CancelJob(uuid string) (*sqltypes.Result, error) {
if err != nil {
return emptyResult, nil
}
if status == canceledStatus || status == failedStatus {
if status == canceledStatus || status == failedStatus || status == completedStatus {
emptyResult.Info = fmt.Sprintf(" The job status is %s and can't canceld", status)
return emptyResult, nil
}

return &sqltypes.Result{}, nil
qr, err := jc.updateJobStatus(ctx, uuid, canceledStatus)
if err != nil {
return emptyResult, nil
}

// 对于paused和running这两个状态的job,它们在内存中都有一个jobRunner协程正在运行,需要将协程关闭
if status == runningStatus || status == pausedStatus {
jc.jobChansMutex.Lock()
cancelChan := jc.jobChans[uuid].cancel
jc.jobChansMutex.Unlock()

// 由于chan的容量为1,会阻塞在这里,直到jobRunner中接收了此信号
cancelChan <- "cancel"
}

tableName, _ := jc.GetStrJobInfo(ctx, uuid, "related_table")

jc.jobChansMutex.Lock()
jc.workingUUIDsMutex.Lock()
jc.workingTablesMutex.Lock()
jc.deleteDMLJobRunningMeta(uuid, tableName)
jc.jobChansMutex.Unlock()
jc.workingUUIDsMutex.Unlock()
jc.workingTablesMutex.Unlock()

return qr, nil
}

func (jc *JobController) CompleteJob(ctx context.Context, uuid, table string) (*sqltypes.Result, error) {
jc.workingTablesMutex.Lock()
defer jc.workingTablesMutex.Unlock()
delete(jc.workingTables, table)

jc.workingUUIDsMutex.Lock()
defer jc.workingUUIDsMutex.Unlock()
delete(jc.workingUUIDs, uuid)

jc.jobChansMutex.Lock()
defer jc.jobChansMutex.Unlock()
close(jc.jobChans[uuid].pauseAndResume)
Expand Down Expand Up @@ -420,10 +449,11 @@ func (jc *JobController) jobScheduler(checkBeforeSchedule chan struct{}) {
subtaskSQL := row["subtask_sql"].ToString()
dmlType := row["dml_type"].ToString()
countTotalRows, _ := row["count_total_rows"].ToInt64()
subtaskRows, _ := row["subtask_rows"].ToInt64()
if jc.checkDmlJobRunnable(status, table) {
// todo 这里之后改成休眠的方式后要删掉, 由于外面拿锁,必须在这里就加上,不然后面的循环可能:已经启动go runner的但是还未加入到working table,导致多个表的同时启动
jc.initDMLJobRunningMeta(uuid, table)
go jc.dmlJobRunner(uuid, table, schema, subtaskSQL, dmlType, timegap, countTotalRows, 0, true)
go jc.dmlJobRunner(uuid, table, schema, subtaskSQL, dmlType, timegap, countTotalRows, 0, subtaskRows, true)
}
}

Expand All @@ -447,13 +477,14 @@ func (jc *JobController) checkDmlJobRunnable(status, table string) bool {
return true
}

func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dmlType string, timeGap, countTotalRows, offset int64, updateStatusRunning bool) {
func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dmlType string, timeGap, countTotalRows, offset, subtaskRows int64, updateStatusRunning bool) {

jc.jobChansMutex.Lock()
jobChan := jc.jobChans[uuid]
jc.jobChansMutex.Unlock()

pauseAndResumeChan := jobChan.pauseAndResume
cancelChan := jobChan.cancel

// timeGap 单位ms,duration输入ns,应该乘上1000000
timer := time.NewTicker(time.Duration(timeGap * 1e6))
Expand All @@ -471,60 +502,69 @@ func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dm

// 在一个无限循环中等待定时器触发
for {
// 第一层select用于接收是否有用户的cancel命令,以随时结束协程
select {
case <-timer.C:
// 定时器触发时执行的函数
// 获得sql,分update和delete两种情况
var query string
if dmlType == "update" {
query, err = sqlparser.ParseAndBind(subtaskSQL, sqltypes.Int64BindVariable(offset))
if err != nil {
jc.FailJob(ctx, uuid, err.Error(), table)
}
}
if dmlType == "delete" {
query = subtaskSQL
}

// todo newborn22,删除,旧方案 有bug
//qr, err := jc.execQuery(ctx, relatedSchema, query)
affectedRows, err := jc.execSubtaskAndRecord(ctx, relatedSchema, query, uuid)

if err != nil {
jc.FailJob(ctx, uuid, err.Error(), table)
case cmd := <-cancelChan:
if cmd == "cancel" {
_ = jc.updateJobMessage(ctx, uuid, fmt.Sprintf("Canceld by user at %s", time.Now().Format("2006-01-02 15:04:05")))
return
}
default:
select {
case <-timer.C:
// 定时器触发时执行的函数
// 获得sql,分update和delete两种情况
var query string
if dmlType == "update" {
query, err = sqlparser.ParseAndBind(subtaskSQL, sqltypes.Int64BindVariable(offset))
if err != nil {
jc.FailJob(ctx, uuid, err.Error(), table)
}
}
if dmlType == "delete" {
query = subtaskSQL
}

// todo newborn22,删除,旧方案 有bug
//qr, err := jc.execQuery(ctx, relatedSchema, query)
affectedRows, err := jc.execSubtaskAndRecord(ctx, relatedSchema, query, uuid)

// complete,分update和delete两种情况
// todo newborn22,删除,旧方案 有bug
//if (dmlType == "delete" && qr.RowsAffected == 0) || (dmlType == "update" && offset >= countTotalRows) {
if (dmlType == "delete" && affectedRows == 0) || (dmlType == "update" && offset >= countTotalRows) {
_, err = jc.CompleteJob(ctx, uuid, table)
if err != nil {
jc.FailJob(ctx, uuid, err.Error(), table)
return
}
return
}

// todo newborn22,删除,旧方案 有bug
//err = jc.updateJobAffectedRows(ctx, uuid, int64(qr.RowsAffected))
//if err != nil {
// jc.FailJob(ctx, uuid, err.Error(), table)
// return
//}
if dmlType == "update" {
//offset += int64(qr.RowsAffected)
offset += affectedRows
}
// complete,分update和delete两种情况
// todo newborn22,删除,旧方案 有bug
//if (dmlType == "delete" && qr.RowsAffected == 0) || (dmlType == "update" && offset >= countTotalRows) {
if (dmlType == "delete" && affectedRows == 0) || (dmlType == "update" && offset >= countTotalRows) {
_, err = jc.CompleteJob(ctx, uuid, table)
if err != nil {
jc.FailJob(ctx, uuid, err.Error(), table)
}
return
}

// todo newborn22,删除,旧方案 有bug
//err = jc.updateJobAffectedRows(ctx, uuid, int64(qr.RowsAffected))
//if err != nil {
// jc.FailJob(ctx, uuid, err.Error(), table)
// return
//}
if dmlType == "update" {
//offset += int64(qr.RowsAffected)
offset += subtaskRows
}

// 控制暂停
case command := <-pauseAndResumeChan:
switch command {
case "pause":
for {
cmd := <-pauseAndResumeChan
if cmd == "resume" { // actually, cmd will always be "resume", the code logic will guarantee that
break
// 控制暂停
case command := <-pauseAndResumeChan:
switch command {
case "pause":
for {
cmd := <-pauseAndResumeChan
if cmd == "resume" { // actually, cmd will always be "resume", the code logic will guarantee that
break
}
}
}
}
Expand Down Expand Up @@ -943,10 +983,11 @@ func (jc *JobController) jobHealthCheck(checkBeforeSchedule chan struct{}) {
dmlType := row["dml_type"].ToString()
countTotalRows, _ := row["count_total_rows"].ToInt64()
AffectedRows, _ := row["affected_rows"].ToInt64()
subtaskRows, _ := row["subtask_rows"].ToInt64()

if status == runningStatus {
jc.initDMLJobRunningMeta(uuid, table)
go jc.dmlJobRunner(uuid, table, tableSchema, subtaskSQL, dmlType, timegap, countTotalRows, AffectedRows, false)
go jc.dmlJobRunner(uuid, table, tableSchema, subtaskSQL, dmlType, timegap, countTotalRows, AffectedRows, subtaskRows, false)
}
if status == pausedStatus {
jc.initDMLJobRunningMeta(uuid, table)
Expand All @@ -955,7 +996,7 @@ func (jc *JobController) jobHealthCheck(checkBeforeSchedule chan struct{}) {
pauseChan := jc.jobChans[uuid].pauseAndResume
jc.jobChansMutex.Unlock()
pauseChan <- "pause"
go jc.dmlJobRunner(uuid, table, tableSchema, subtaskSQL, dmlType, timegap, countTotalRows, AffectedRows, false)
go jc.dmlJobRunner(uuid, table, tableSchema, subtaskSQL, dmlType, timegap, countTotalRows, AffectedRows, subtaskRows, false)
}
}

Expand Down
4 changes: 4 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,10 @@ func (qre *QueryExecutor) execAlterDMLJob() (*sqltypes.Result, error) {
return qre.tsv.dmlJonController.HandleRequest("pause", "", uuid, "", 0, 0, false, false)
case sqlparser.ResumeDMLJobType:
return qre.tsv.dmlJonController.HandleRequest("resume", "", uuid, "", 0, 0, false, false)
case sqlparser.LaunchDMLJobType:
return qre.tsv.dmlJonController.HandleRequest("launch", "", uuid, "", 0, 0, false, false)
case sqlparser.CancelDMLJobType:
return qre.tsv.dmlJonController.HandleRequest("cancel", "", uuid, "", 0, 0, false, false)
}
return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ALTER DML_JOB not implemented")
}
Expand Down

0 comments on commit 1d8aeff

Please sign in to comment.