From c547adf07ef05218f15fb1f0423bca8ab4888862 Mon Sep 17 00:00:00 2001 From: newborn22 <953950914@qq.com> Date: Tue, 12 Dec 2023 14:33:06 +0800 Subject: [PATCH] feat: support throttle Signed-off-by: newborn22 <953950914@qq.com> --- .../schema/job/big_dml_jobs_table.sql | 2 + go/vt/vttablet/jobcontroller/controller.go | 170 ++++++++++++++++-- go/vt/vttablet/tabletserver/query_executor.go | 12 +- go/vt/vttablet/tabletserver/tabletserver.go | 4 +- 4 files changed, 164 insertions(+), 24 deletions(-) diff --git a/go/vt/sidecardb/schema/job/big_dml_jobs_table.sql b/go/vt/sidecardb/schema/job/big_dml_jobs_table.sql index d0dac59a04..a697b5b2be 100644 --- a/go/vt/sidecardb/schema/job/big_dml_jobs_table.sql +++ b/go/vt/sidecardb/schema/job/big_dml_jobs_table.sql @@ -31,5 +31,7 @@ CREATE TABLE IF NOT EXISTS mysql.big_dml_jobs_table `count_total_rows_sql` varchar(256) NULL DEFAULT NULL, `dml_type` varchar(256) NULL DEFAULT NULL, `affected_rows` bigint NOT NULL DEFAULT 0, + `throttle_ratio` double NULL DEFAULT NULL, + `throttle_expire_time` varchar(256) NULL DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE = InnoDB diff --git a/go/vt/vttablet/jobcontroller/controller.go b/go/vt/vttablet/jobcontroller/controller.go index c7d8425058..7c7913ff64 100644 --- a/go/vt/vttablet/jobcontroller/controller.go +++ b/go/vt/vttablet/jobcontroller/controller.go @@ -10,12 +10,18 @@ import ( "errors" "fmt" "math" + "net/http" "regexp" "strconv" "strings" "sync" + "sync/atomic" "time" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle" + "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/mysql/collations" @@ -111,14 +117,32 @@ const ( WHERE TABLE_SCHEMA = %a AND TABLE_NAME = %a` + + sqlDMLJobUpdateThrottleInfo = `update mysql.big_dml_jobs_table set + throttle_ratio = %a , + throttle_expire_time = %a + where + job_uuid = %a` + + sqlDMLJobClearThrottleInfo = `update mysql.big_dml_jobs_table set + throttle_ratio = NULL , + throttle_expire_time = NULL + where + job_uuid = %a` +) + +const ( + throttleCheckDuration = 250 * time.Millisecond ) type JobController struct { - tableName string - tableMutex sync.Mutex // todo newborn22,检查是否都上锁了 - tabletTypeFunc func() topodatapb.TabletType - env tabletenv.Env - pool *connpool.Pool + tableName string + tableMutex sync.Mutex // todo newborn22,检查是否都上锁了 + tabletTypeFunc func() topodatapb.TabletType + env tabletenv.Env + pool *connpool.Pool + lagThrottler *throttle.Throttler + lastSuccessfulThrottle int64 workingTables map[string]bool // 用于调度时检测当前任务是否和正在工作的表冲突,paused、running状态的job的表都在里面 workingTablesMutex sync.Mutex @@ -153,6 +177,7 @@ func (jc *JobController) Open() error { go jc.jobHealthCheck(jc.checkBeforeSchedule) go jc.jobScheduler(jc.checkBeforeSchedule) + initThrottleTicker() } return nil @@ -162,7 +187,7 @@ func (jc *JobController) Close() { jc.pool.Close() } -func NewJobController(tableName string, tabletTypeFunc func() topodatapb.TabletType, env tabletenv.Env) *JobController { +func NewJobController(tableName string, tabletTypeFunc func() topodatapb.TabletType, env tabletenv.Env, lagThrottler *throttle.Throttler) *JobController { return &JobController{ tableName: tableName, tabletTypeFunc: tabletTypeFunc, @@ -170,7 +195,8 @@ func NewJobController(tableName string, tabletTypeFunc func() topodatapb.TabletT pool: connpool.NewPool(env, "DMLJobControllerPool", tabletenv.ConnPoolConfig{ Size: databasePoolSize, IdleTimeoutSeconds: env.Config().OltpReadPool.IdleTimeoutSeconds, - })} + }), + lagThrottler: lagThrottler} // 检查字段 @@ -179,7 +205,7 @@ func NewJobController(tableName string, tabletTypeFunc func() topodatapb.TabletT } // todo newborn22 , 能否改写得更有通用性? 这样改写是否好? -func (jc *JobController) HandleRequest(command, sql, jobUUID, tableSchema string, timeGapInMs, subtaskRows int64, postponeLaunch, autoRetry bool) (*sqltypes.Result, error) { +func (jc *JobController) HandleRequest(command, sql, jobUUID, tableSchema, expireString string, ratioLiteral *sqlparser.Literal, timeGapInMs, subtaskRows int64, postponeLaunch, autoRetry bool) (*sqltypes.Result, error) { // todo newborn22, if 可以删掉 if jc.tabletTypeFunc() == topodatapb.TabletType_PRIMARY { switch command { @@ -195,6 +221,10 @@ func (jc *JobController) HandleRequest(command, sql, jobUUID, tableSchema string return jc.LaunchJob(jobUUID) case CancelJob: return jc.CancelJob(jobUUID) + case ThrottleJob: + return jc.ThrottleJob(jobUUID, expireString, ratioLiteral) + case UnthrottleJob: + return jc.UnthrottleJob(jobUUID) } } // todo newborn22,对返回值判断为空? @@ -387,6 +417,108 @@ func (jc *JobController) CancelJob(uuid string) (*sqltypes.Result, error) { return qr, nil } +// 指定throttle的时长和ratio +// ratio表示限流的比例,最大为1,即完全限流 +// 时长的格式举例: +// "300ms" 表示 300 毫秒。 +// "-1.5h" 表示负1.5小时。 +// "2h45m" 表示2小时45分钟。 +func (jc *JobController) ThrottleJob(uuid, expireString string, ratioLiteral *sqlparser.Literal) (result *sqltypes.Result, err error) { + emptyResult := &sqltypes.Result{} + duration, ratio, err := jc.validateThrottleParams(expireString, ratioLiteral) + if err != nil { + return nil, err + } + if err := jc.lagThrottler.CheckIsReady(); err != nil { + return nil, err + } + expireAt := time.Now().Add(duration) + _ = jc.lagThrottler.ThrottleApp(uuid, expireAt, ratio) + + query, err := sqlparser.ParseAndBind(sqlDMLJobUpdateThrottleInfo, + sqltypes.Float64BindVariable(ratio), + sqltypes.StringBindVariable(expireAt.String()), + sqltypes.StringBindVariable(uuid)) + if err != nil { + return emptyResult, err + } + ctx := context.Background() + jc.tableMutex.Lock() + defer jc.tableMutex.Unlock() + return jc.execQuery(ctx, "", query) +} + +func (jc *JobController) UnthrottleJob(uuid string) (result *sqltypes.Result, err error) { + emptyResult := &sqltypes.Result{} + if err := jc.lagThrottler.CheckIsReady(); err != nil { + return nil, err + } + _ = jc.lagThrottler.UnthrottleApp(uuid) + + query, err := sqlparser.ParseAndBind(sqlDMLJobClearThrottleInfo, + sqltypes.StringBindVariable(uuid)) + if err != nil { + return emptyResult, err + } + ctx := context.Background() + jc.tableMutex.Lock() + defer jc.tableMutex.Unlock() + return jc.execQuery(ctx, "", query) +} + +var throttleTicks int64 +var throttleInit sync.Once + +func initThrottleTicker() { + throttleInit.Do(func() { + go func() { + tick := time.NewTicker(throttleCheckDuration) + defer tick.Stop() + for range tick.C { + atomic.AddInt64(&throttleTicks, 1) + } + }() + }) +} + +func (jc *JobController) requestThrottle(uuid string) (throttleCheckOK bool) { + if jc.lastSuccessfulThrottle >= atomic.LoadInt64(&throttleTicks) { + // if last check was OK just very recently there is no need to check again + return true + } + ctx := context.Background() + // 请求时给每一个throttle的app名都加上了dml-job前缀,这样可以通过throttle dml-job来throttle所有的dml jobs + appName := "dml-job:" + uuid + // 这里不特别设置flag + throttleCheckFlags := &throttle.CheckFlags{} + // 由于dml job子任务需要同步到集群中的各个从节点,因此throttle也依据的是集群的复制延迟 + checkType := throttle.ThrottleCheckPrimaryWrite + checkRst := jc.lagThrottler.CheckByType(ctx, appName, "", throttleCheckFlags, checkType) + if checkRst.StatusCode != http.StatusOK { + return false + } + jc.lastSuccessfulThrottle = atomic.LoadInt64(&throttleTicks) + return true +} + +func (jc *JobController) validateThrottleParams(expireString string, ratioLiteral *sqlparser.Literal) (duration time.Duration, ratio float64, err error) { + duration = time.Hour * 24 * 365 * 100 + if expireString != "" { + duration, err = time.ParseDuration(expireString) + if err != nil || duration < 0 { + return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid EXPIRE value: %s. Try '120s', '30m', '1h', etc. Allowed units are (s)ec, (m)in, (h)hour", expireString) + } + } + ratio = 1.0 + if ratioLiteral != nil { + ratio, err = strconv.ParseFloat(ratioLiteral.Val, 64) + if err != nil || ratio < 0 || ratio > 1 { + return duration, ratio, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "invalid RATIO value: %s. Try any decimal number between '0.0' (no throttle) and `1.0` (fully throttled)", ratioLiteral.Val) + } + } + return duration, ratio, nil +} + func (jc *JobController) CompleteJob(ctx context.Context, uuid, table string) (*sqltypes.Result, error) { jc.workingTablesMutex.Lock() defer jc.workingTablesMutex.Unlock() @@ -513,6 +645,12 @@ func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dm select { case <-timer.C: // 定时器触发时执行的函数 + + // 先请求throttle,若被throttle阻塞,则等待下一次timer事件 + if !jc.requestThrottle(uuid) { + continue + } + // 获得sql,分update和delete两种情况 var query string if dmlType == "update" { @@ -525,8 +663,6 @@ func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dm query = subtaskSQL } - // todo newborn22,删除,旧方案 有bug - //qr, err := jc.execQuery(ctx, relatedSchema, query) affectedRows, err := jc.execSubtaskAndRecord(ctx, relatedSchema, query, uuid) if err != nil { @@ -535,8 +671,6 @@ func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dm } // complete,分update和delete两种情况 - // todo newborn22,删除,旧方案 有bug - //if (dmlType == "delete" && qr.RowsAffected == 0) || (dmlType == "update" && offset >= countTotalRows) { if (dmlType == "delete" && affectedRows == 0) || (dmlType == "update" && offset >= countTotalRows) { _, err = jc.CompleteJob(ctx, uuid, table) if err != nil { @@ -545,12 +679,6 @@ func (jc *JobController) dmlJobRunner(uuid, table, relatedSchema, subtaskSQL, dm return } - // todo newborn22,删除,旧方案 有bug - //err = jc.updateJobAffectedRows(ctx, uuid, int64(qr.RowsAffected)) - //if err != nil { - // jc.FailJob(ctx, uuid, err.Error(), table) - // return - //} if dmlType == "update" { //offset += int64(qr.RowsAffected) offset += subtaskRows @@ -631,6 +759,9 @@ func (jc *JobController) genSubtaskDMLSQL(sql, tableSchema string, subtaskRows i } tableName := sqlparser.String(s.TableExprs) whereStr := sqlparser.String(s.Where) + if s.Where == nil { + return "", "", "", "", errors.New("the sql without WHERE can't be transferred to a DML job") + } countTotalRowsSQL = fmt.Sprintf("select count(*) from %s %s", tableName, whereStr) return tableName, "delete", sqlparser.String(s), countTotalRowsSQL, nil @@ -670,6 +801,9 @@ func (jc *JobController) genSubtaskDMLSQL(sql, tableSchema string, subtaskRows i selectStr += fmt.Sprintf(" from %s ", tableName) whereStr := sqlparser.String(s.Where) + if s.Where == nil { + return "", "", "", "", errors.New("the sql without WHERE can't be transferred to a DML job") + } selectStr += whereStr firstPK = true diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 2f6576557f..8b080f025b 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -1161,13 +1161,17 @@ func (qre *QueryExecutor) execAlterDMLJob() (*sqltypes.Result, error) { uuid := alterDMLJob.UUID switch alterDMLJob.Type { case sqlparser.PauseDMLJobType: - return qre.tsv.dmlJonController.HandleRequest("pause", "", uuid, "", 0, 0, false, false) + return qre.tsv.dmlJonController.HandleRequest("pause", "", uuid, "", "", nil, 0, 0, false, false) case sqlparser.ResumeDMLJobType: - return qre.tsv.dmlJonController.HandleRequest("resume", "", uuid, "", 0, 0, false, false) + return qre.tsv.dmlJonController.HandleRequest("resume", "", uuid, "", "", nil, 0, 0, false, false) case sqlparser.LaunchDMLJobType: - return qre.tsv.dmlJonController.HandleRequest("launch", "", uuid, "", 0, 0, false, false) + return qre.tsv.dmlJonController.HandleRequest("launch", "", uuid, "", "", nil, 0, 0, false, false) case sqlparser.CancelDMLJobType: - return qre.tsv.dmlJonController.HandleRequest("cancel", "", uuid, "", 0, 0, false, false) + return qre.tsv.dmlJonController.HandleRequest("cancel", "", uuid, "", "", nil, 0, 0, false, false) + case sqlparser.ThrottleDMLJobType: + return qre.tsv.dmlJonController.HandleRequest("throttle", "", uuid, "", alterDMLJob.Expire, alterDMLJob.Ratio, 0, 0, false, false) + case sqlparser.UnthrottleDMLJobType: + return qre.tsv.dmlJonController.HandleRequest("unthrottle", "", uuid, "", "", nil, 0, 0, false, false) } return nil, vterrors.New(vtrpcpb.Code_UNIMPLEMENTED, "ALTER DML_JOB not implemented") } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index e6bc86690f..74be06e1d4 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -194,7 +194,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer) tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer) - tsv.dmlJonController = jobcontroller.NewJobController("big_dml_jobs_table", tabletTypeFunc, tsv) + tsv.dmlJonController = jobcontroller.NewJobController("big_dml_jobs_table", tabletTypeFunc, tsv, tsv.lagThrottler) tsv.tableGC = gc.NewTableGC(tsv, topoServer, tsv.lagThrottler) tsv.sm = &stateManager{ @@ -1495,7 +1495,7 @@ func (tsv *TabletServer) SetFailPoint(ctx context.Context, command string, key s // todo newborn22,改名,submitDMLjob func (tsv *TabletServer) SubmitDMLJob(ctx context.Context, command, sql, jobUUID, tableSchema string, timeGapInMs, subtaskRows int64, postponeLaunch, autoRetry bool) (*sqltypes.Result, error) { // todo newborn22, 这个地方要进行封装?,变成更通用的 - return tsv.dmlJonController.HandleRequest(command, sql, jobUUID, tableSchema, timeGapInMs, subtaskRows, postponeLaunch, autoRetry) + return tsv.dmlJonController.HandleRequest(command, sql, jobUUID, tableSchema, "", nil, timeGapInMs, subtaskRows, postponeLaunch, autoRetry) } // execRequest performs verifications, sets up the necessary environments