Skip to content

Commit

Permalink
feat: support throttle
Browse files Browse the repository at this point in the history
Signed-off-by: newborn22 <[email protected]>
  • Loading branch information
newborn22 committed Dec 12, 2023
1 parent 1d8aeff commit c547adf
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 24 deletions.
2 changes: 2 additions & 0 deletions go/vt/sidecardb/schema/job/big_dml_jobs_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -31,5 +31,7 @@ CREATE TABLE IF NOT EXISTS mysql.big_dml_jobs_table
`count_total_rows_sql` varchar(256) NULL DEFAULT NULL,
`dml_type` varchar(256) NULL DEFAULT NULL,
`affected_rows` bigint NOT NULL DEFAULT 0,
`throttle_ratio` double NULL DEFAULT NULL,
`throttle_expire_time` varchar(256) NULL DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE = InnoDB
170 changes: 152 additions & 18 deletions go/vt/vttablet/jobcontroller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,18 @@ import (
"errors"
"fmt"
"math"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"

"vitess.io/vitess/go/vt/schema"

"vitess.io/vitess/go/mysql/collations"
Expand Down Expand Up @@ -111,14 +117,32 @@ const (
WHERE
TABLE_SCHEMA = %a
AND TABLE_NAME = %a`

sqlDMLJobUpdateThrottleInfo = `update mysql.big_dml_jobs_table set
throttle_ratio = %a ,
throttle_expire_time = %a
where
job_uuid = %a`

sqlDMLJobClearThrottleInfo = `update mysql.big_dml_jobs_table set
throttle_ratio = NULL ,
throttle_expire_time = NULL
where
job_uuid = %a`
)

const (
throttleCheckDuration = 250 * time.Millisecond
)

type JobController struct {
tableName string
tableMutex sync.Mutex // todo newborn22,检查是否都上锁了
tabletTypeFunc func() topodatapb.TabletType
env tabletenv.Env
pool *connpool.Pool
tableName string
tableMutex sync.Mutex // todo newborn22,检查是否都上锁了
tabletTypeFunc func() topodatapb.TabletType
env tabletenv.Env
pool *connpool.Pool
lagThrottler *throttle.Throttler
lastSuccessfulThrottle int64

workingTables map[string]bool // 用于调度时检测当前任务是否和正在工作的表冲突,paused、running状态的job的表都在里面
workingTablesMutex sync.Mutex
Expand Down Expand Up @@ -153,6 +177,7 @@ func (jc *JobController) Open() error {

go jc.jobHealthCheck(jc.checkBeforeSchedule)
go jc.jobScheduler(jc.checkBeforeSchedule)
initThrottleTicker()

}
return nil
Expand All @@ -162,15 +187,16 @@ func (jc *JobController) Close() {
jc.pool.Close()
}

func NewJobController(tableName string, tabletTypeFunc func() topodatapb.TabletType, env tabletenv.Env) *JobController {
func NewJobController(tableName string, tabletTypeFunc func() topodatapb.TabletType, env tabletenv.Env, lagThrottler *throttle.Throttler) *JobController {
return &JobController{
tableName: tableName,
tabletTypeFunc: tabletTypeFunc,
env: env,
pool: connpool.NewPool(env, "DMLJobControllerPool", tabletenv.ConnPoolConfig{
Size: databasePoolSize,
IdleTimeoutSeconds: env.Config().OltpReadPool.IdleTimeoutSeconds,
})}
}),
lagThrottler: lagThrottler}

// 检查字段

Expand All @@ -179,7 +205,7 @@ func NewJobController(tableName string, tabletTypeFunc func() topodatapb.TabletT
}

// todo newborn22 , 能否改写得更有通用性? 这样改写是否好?
func (jc *JobController) HandleRequest(command, sql, jobUUID, tableSchema string, timeGapInMs, subtaskRows int64, postponeLaunch, autoRetry bool) (*sqltypes.Result, error) {
func (jc *JobController) HandleRequest(command, sql, jobUUID, tableSchema, expireString string, ratioLiteral *sqlparser.Literal, timeGapInMs, subtaskRows int64, postponeLaunch, autoRetry bool) (*sqltypes.Result, error) {
// todo newborn22, if 可以删掉
if jc.tabletTypeFunc() == topodatapb.TabletType_PRIMARY {
switch command {
Expand All @@ -195,6 +221,10 @@ func (jc *JobController) HandleRequest(command, sql, jobUUID, tableSchema string
return jc.LaunchJob(jobUUID)
case CancelJob:
return jc.CancelJob(jobUUID)
case ThrottleJob:
return jc.ThrottleJob(jobUUID, expireString, ratioLiteral)
case UnthrottleJob:
return jc.UnthrottleJob(jobUUID)
}
}
// todo newborn22,对返回值判断为空?
Expand Down Expand Up @@ -387,6 +417,108 @@ func (jc *JobController) CancelJob(uuid string) (*sqltypes.Result, error) {
return qr, nil
}

// 指定throttle的时长和ratio
// ratio表示限流的比例,最大为1,即完全限流
// 时长的格式举例:
// "300ms" 表示 300 毫秒。
// "-1.5h" 表示负1.5小时。
// "2h45m" 表示2小时45分钟。
func (jc *JobController) ThrottleJob(uuid, expireString string, ratioLiteral *sqlparser.Literal) (result *sqltypes.Result, err error) {
emptyResult := &sqltypes.Result{}
duration, ratio, err := jc.validateThrottleParams(expireString, ratioLiteral)
if err != nil {
return nil, err
}
if err := jc.lagThrottler.CheckIsReady(); err != nil {
return nil, err
}
expireAt := time.Now().Add(duration)
_ = jc.lagThrottler.ThrottleApp(uuid, expireAt, ratio)

query, err := sqlparser.ParseAndBind(sqlDMLJobUpdateThrottleInfo,
sqltypes.Float64BindVariable(ratio),
sqltypes.StringBindVariable(expireAt.String()),
sqltypes.StringBindVariable(uuid))
if err != nil {
return emptyResult, err
}
ctx := context.Background()
jc.tableMutex.Lock()
defer jc.tableMutex.Unlock()
return jc.execQuery(ctx, "", query)
}

func (jc *JobController) UnthrottleJob(uuid string) (result *sqltypes.Result, err error) {
emptyResult := &sqltypes.Result{}
if err := jc.lagThrottler.CheckIsReady(); err != nil {
return nil, err
}
_ = jc.lagThrottler.UnthrottleApp(uuid)

query, err := sqlparser.ParseAndBind(sqlDMLJobClearThrottleInfo,
sqltypes.StringBindVariable(uuid))
if err != nil {
return emptyResult, err
}
ctx := context.Background()
jc.tableMutex.Lock()
defer jc.tableMutex.Unlock()
return jc.execQuery(ctx, "", query)
}

var throttleTicks int64
var throttleInit sync.Once

func initThrottleTicker() {
throttleInit.Do(func() {
go func() {
tick := time.NewTicker(throttleCheckDuration)
defer tick.Stop()
for range tick.C {
atomic.AddInt64(&throttleTicks, 1)
}
}()
})
}

func (jc *JobController) requestThrottle(uuid string) (throttleCheckOK bool) {
if jc.lastSuccessfulThrottle >= atomic.LoadInt64(&throttleTicks) {
// if last check was OK just very recently there is no need to check again
return true
}
ctx := context.Background()
// 请求时给每一个throttle的app名都加上了dml-job前缀,这样可以通过throttle dml-job来throttle所有的dml jobs
appName := "dml-job:" + uuid
// 这里不特别设置flag
throttleCheckFlags := &throttle.CheckFlags{}
// 由于dml job子任务需要同步到集群中的各个从节点,因此throttle也依据的是集群的复制延迟
checkType := throttle.ThrottleCheckPrimaryWrite
checkRst := jc.lagThrottler.CheckByType(ctx, appName, "", throttleCheckFlags, checkType)
if checkRst.StatusCode != http.StatusOK {
return false
}
jc.lastSuccessfulThrottle = atomic.LoadInt64(&throttleTicks)
return true
}

func (jc *JobController) validateThrottleParams(expireString string, ratioLiteral *sqlparser.Literal) (duration time.Duration, ratio float64, err error) {
duration = time.Hour * 24 * 365 * 100
if expireString != "" {
duration, err = time.ParseDuration(expireString)
if err != nil || duration < 0 {
return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid EXPIRE value: %s. Try '120s', '30m', '1h', etc. Allowed units are (s)ec, (m)in, (h)hour", expireString)
}
}
ratio = 1.0
if ratioLiteral != nil {
ratio, err = strconv.ParseFloat(ratioLiteral.Val, 64)
if err != nil || ratio < 0 || ratio > 1 {
return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid RATIO value: %s. Try any decimal number between '0.0' (no throttle) and `1.0` (fully throttled)", ratioLiteral.Val)
}
}
return duration, ratio, nil
}

func (jc *JobController) CompleteJob(ctx context.Context, uuid, table string) (*sqltypes.Result, error) {
jc.workingTablesMutex.Lock()
defer jc.workingTablesMutex.Unlock()
Expand Down Expand Up @@ -513,6 +645,12 @@ func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dm
select {
case <-timer.C:
// 定时器触发时执行的函数

// 先请求throttle,若被throttle阻塞,则等待下一次timer事件
if !jc.requestThrottle(uuid) {
continue
}

// 获得sql,分update和delete两种情况
var query string
if dmlType == "update" {
Expand All @@ -525,8 +663,6 @@ func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dm
query = subtaskSQL
}

// todo newborn22,删除,旧方案 有bug
//qr, err := jc.execQuery(ctx, relatedSchema, query)
affectedRows, err := jc.execSubtaskAndRecord(ctx, relatedSchema, query, uuid)

if err != nil {
Expand All @@ -535,8 +671,6 @@ func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dm
}

// complete,分update和delete两种情况
// todo newborn22,删除,旧方案 有bug
//if (dmlType == "delete" && qr.RowsAffected == 0) || (dmlType == "update" && offset >= countTotalRows) {
if (dmlType == "delete" && affectedRows == 0) || (dmlType == "update" && offset >= countTotalRows) {
_, err = jc.CompleteJob(ctx, uuid, table)
if err != nil {
Expand All @@ -545,12 +679,6 @@ func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dm
return
}

// todo newborn22,删除,旧方案 有bug
//err = jc.updateJobAffectedRows(ctx, uuid, int64(qr.RowsAffected))
//if err != nil {
// jc.FailJob(ctx, uuid, err.Error(), table)
// return
//}
if dmlType == "update" {
//offset += int64(qr.RowsAffected)
offset += subtaskRows
Expand Down Expand Up @@ -631,6 +759,9 @@ func (jc *JobController) genSubtaskDMLSQL(sql, tableSchema string, subtaskRows i
}
tableName := sqlparser.String(s.TableExprs)
whereStr := sqlparser.String(s.Where)
if s.Where == nil {
return "", "", "", "", errors.New("the sql without WHERE can't be transferred to a DML job")
}
countTotalRowsSQL = fmt.Sprintf("select count(*) from %s %s", tableName, whereStr)

return tableName, "delete", sqlparser.String(s), countTotalRowsSQL, nil
Expand Down Expand Up @@ -670,6 +801,9 @@ func (jc *JobController) genSubtaskDMLSQL(sql, tableSchema string, subtaskRows i
selectStr += fmt.Sprintf(" from %s ", tableName)

whereStr := sqlparser.String(s.Where)
if s.Where == nil {
return "", "", "", "", errors.New("the sql without WHERE can't be transferred to a DML job")
}
selectStr += whereStr

firstPK = true
Expand Down
12 changes: 8 additions & 4 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1161,13 +1161,17 @@ func (qre *QueryExecutor) execAlterDMLJob() (*sqltypes.Result, error) {
uuid := alterDMLJob.UUID
switch alterDMLJob.Type {
case sqlparser.PauseDMLJobType:
return qre.tsv.dmlJonController.HandleRequest("pause", "", uuid, "", 0, 0, false, false)
return qre.tsv.dmlJonController.HandleRequest("pause", "", uuid, "", "", nil, 0, 0, false, false)
case sqlparser.ResumeDMLJobType:
return qre.tsv.dmlJonController.HandleRequest("resume", "", uuid, "", 0, 0, false, false)
return qre.tsv.dmlJonController.HandleRequest("resume", "", uuid, "", "", nil, 0, 0, false, false)
case sqlparser.LaunchDMLJobType:
return qre.tsv.dmlJonController.HandleRequest("launch", "", uuid, "", 0, 0, false, false)
return qre.tsv.dmlJonController.HandleRequest("launch", "", uuid, "", "", nil, 0, 0, false, false)
case sqlparser.CancelDMLJobType:
return qre.tsv.dmlJonController.HandleRequest("cancel", "", uuid, "", 0, 0, false, false)
return qre.tsv.dmlJonController.HandleRequest("cancel", "", uuid, "", "", nil, 0, 0, false, false)
case sqlparser.ThrottleDMLJobType:
return qre.tsv.dmlJonController.HandleRequest("throttle", "", uuid, "", alterDMLJob.Expire, alterDMLJob.Ratio, 0, 0, false, false)
case sqlparser.UnthrottleDMLJobType:
return qre.tsv.dmlJonController.HandleRequest("unthrottle", "", uuid, "", "", nil, 0, 0, false, false)
}
return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ALTER DML_JOB not implemented")
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to
tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer)

tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer)
tsv.dmlJonController = jobcontroller.NewJobController("big_dml_jobs_table", tabletTypeFunc, tsv)
tsv.dmlJonController = jobcontroller.NewJobController("big_dml_jobs_table", tabletTypeFunc, tsv, tsv.lagThrottler)
tsv.tableGC = gc.NewTableGC(tsv, topoServer, tsv.lagThrottler)

tsv.sm = &stateManager{
Expand Down Expand Up @@ -1495,7 +1495,7 @@ func (tsv *TabletServer) SetFailPoint(ctx context.Context, command string, key s
// todo newborn22,改名,submitDMLjob
func (tsv *TabletServer) SubmitDMLJob(ctx context.Context, command, sql, jobUUID, tableSchema string, timeGapInMs, subtaskRows int64, postponeLaunch, autoRetry bool) (*sqltypes.Result, error) {
// todo newborn22, 这个地方要进行封装?,变成更通用的
return tsv.dmlJonController.HandleRequest(command, sql, jobUUID, tableSchema, timeGapInMs, subtaskRows, postponeLaunch, autoRetry)
return tsv.dmlJonController.HandleRequest(command, sql, jobUUID, tableSchema, "", nil, timeGapInMs, subtaskRows, postponeLaunch, autoRetry)
}

// execRequest performs verifications, sets up the necessary environments
Expand Down

0 comments on commit c547adf

Please sign in to comment.