From 6697f029008f40a79e2c428ae2d2958c2bfc8576 Mon Sep 17 00:00:00 2001 From: geray <919179287@qq.com> Date: Thu, 21 Dec 2023 17:07:55 +0800 Subject: [PATCH] feat: add branch status monitor Signed-off-by: <919179287@qq.com> --- examples/workflow/Branch/prepare.sh | 2 +- go/vt/vttablet/tabletserver/branch_watch.go | 141 +++++++++++++++++++ go/vt/vttablet/tabletserver/state_manager.go | 3 + go/vt/vttablet/tabletserver/tabletserver.go | 3 + go/vt/wrangler/branch.go | 6 - 5 files changed, 148 insertions(+), 7 deletions(-) create mode 100644 go/vt/vttablet/tabletserver/branch_watch.go diff --git a/examples/workflow/Branch/prepare.sh b/examples/workflow/Branch/prepare.sh index 5927009872..80abc3e955 100755 --- a/examples/workflow/Branch/prepare.sh +++ b/examples/workflow/Branch/prepare.sh @@ -6,7 +6,7 @@ source "$(dirname "${BASH_SOURCE[0]:-$0}")/../../common/env.sh" USER_COUNT=5000 CUSTOMER_COUNT=5000 PRODUCT_COUNT=5000 -CORDER_COUNT=10000 +CORDER_COUNT=200000 mysql -h127.0.0.1 -P15306 -e 'create database if not exists branch_source' diff --git a/go/vt/vttablet/tabletserver/branch_watch.go b/go/vt/vttablet/tabletserver/branch_watch.go new file mode 100644 index 0000000000..4c4d764fb9 --- /dev/null +++ b/go/vt/vttablet/tabletserver/branch_watch.go @@ -0,0 +1,141 @@ +/* +Copyright ApeCloud, Inc. +Licensed under the Apache v2(found in the LICENSE file in the root directory). +*/ + +package tabletserver + +import ( + "context" + "strings" + "time" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" +) + +const SeleteWorkflowNameFromBranchJobs = "SELECT * from mysql.branch_jobs;" + +const SeleteVReplicationByWorkflow = "SELECT * from mysql.vreplication where workflow=%a;" + +const UpdateBranchJobStatusByWorkflow = "update mysql.branch_jobs set status=%a,message=%a where workflow_name=%a" + +const ( + BranchStateOfPrepare = "prepare" + BranchStateOfRunning = "running" + BranchStateOfStop = "Stop" + BranchStateOfCompleted = "completed" + BranchStateOfError = "error" +) + +const UpdateInterval = 5 * time.Second + +type BranchWatcher struct { + dbConfig dbconfigs.Connector + conns *connpool.Pool + + ticker *time.Ticker + updateInterval time.Duration + running bool +} + +func NewBranchWatcher(env tabletenv.Env, dbConfig dbconfigs.Connector) *BranchWatcher { + branchWatcher := BranchWatcher{ + running: false, + ticker: time.NewTicker(UpdateInterval), + updateInterval: UpdateInterval, + dbConfig: dbConfig, + } + branchWatcher.conns = connpool.NewPool(env, "", tabletenv.ConnPoolConfig{ + Size: 2, + IdleTimeoutSeconds: env.Config().OltpReadPool.IdleTimeoutSeconds, + }) + return &branchWatcher +} + +func (b *BranchWatcher) updateBranchState(ctx context.Context, conn *connpool.DBConn, state string, message, workflow string) error { + query, err := sqlparser.ParseAndBind(UpdateBranchJobStatusByWorkflow, + sqltypes.StringBindVariable(state), + sqltypes.StringBindVariable(message), + sqltypes.StringBindVariable(workflow)) + if err != nil { + return err + } + _, err = conn.Exec(ctx, query, -1, false) + if err != nil { + return err + } + return nil +} + +func (b *BranchWatcher) watch() error { + ctx := context.Background() + conn, err := b.conns.Get(ctx, nil) + if err != nil { + return err + } + defer conn.Recycle() + qr, err := conn.Exec(ctx, SeleteWorkflowNameFromBranchJobs, -1, true) + if err != nil { + return err + } + for _, row := range qr.Named().Rows { + workflow := row["workflow_name"].ToString() + query, err := sqlparser.ParseAndBind(SeleteVReplicationByWorkflow, sqltypes.StringBindVariable(workflow)) + if err != nil { + return err + } + vreplication, err := conn.Exec(ctx, query, -1, true) + rows := vreplication.Named().Row() + vState := rows["state"].ToString() + message := rows["message"].ToString() + if err != nil { + return err + } + switch vState { + case "Stopped": + if strings.Contains(message, "Stopped after copy") { + err = b.updateBranchState(ctx, conn, BranchStateOfCompleted, message, workflow) + if err != nil { + return err + } + } else { + err = b.updateBranchState(ctx, conn, BranchStateOfError, message, workflow) + if err != nil { + return err + } + } + case "Error": + err = b.updateBranchState(ctx, conn, BranchStateOfError, message, workflow) + if err != nil { + return err + } + case "Copying": + err = b.updateBranchState(ctx, conn, BranchStateOfRunning, message, workflow) + if err != nil { + return err + } + } + } + return nil +} + +func (b *BranchWatcher) Open() { + b.conns.Open(b.dbConfig, b.dbConfig, b.dbConfig) + b.ticker.Reset(b.updateInterval) + if !b.running { + go func() { + for range b.ticker.C { + b.watch() + } + }() + b.running = true + } +} +func (b *BranchWatcher) Close() { + b.conns.Close() + b.ticker.Stop() +} diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index e439a31708..a12fbfe12b 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -121,6 +121,7 @@ type stateManager struct { vstreamer subComponent tracker subComponent watcher subComponent + branchWatch subComponent qe queryEngine txThrottler txThrottler te txEngine @@ -475,6 +476,7 @@ func (sm *stateManager) servePrimary() error { sm.tableGC.Open() sm.ddle.Open() sm.tableACL.Open() + sm.branchWatch.Open() sm.setState(topodatapb.TabletType_PRIMARY, StateServing) return nil } @@ -503,6 +505,7 @@ func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) er sm.tableGC.Close() sm.messager.Close() sm.tracker.Close() + sm.branchWatch.Close() sm.se.MakeNonPrimary() if err := sm.connect(wantTabletType); err != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index ea8b12f5c5..3b4ee8a7ca 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -122,6 +122,7 @@ type TabletServer struct { hs *healthStreamer lagThrottler *throttle.Throttler tableGC *gc.TableGC + branchWatch *BranchWatcher // sm manages state transitions. sm *stateManager @@ -189,6 +190,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to tsv.txThrottler = txthrottler.NewTxThrottler(tsv.config, topoServer) tsv.te = NewTxEngine(tsv) tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer) + tsv.branchWatch = NewBranchWatcher(tsv, tsv.config.DB.DbaWithDB()) tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer) tsv.tableGC = gc.NewTableGC(tsv, topoServer, tsv.lagThrottler) @@ -203,6 +205,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to vstreamer: tsv.vstreamer, tracker: tsv.tracker, watcher: tsv.watcher, + branchWatch: tsv.branchWatch, qe: tsv.qe, txThrottler: tsv.txThrottler, te: tsv.te, diff --git a/go/vt/wrangler/branch.go b/go/vt/wrangler/branch.go index 8b61c57191..99bff946b8 100644 --- a/go/vt/wrangler/branch.go +++ b/go/vt/wrangler/branch.go @@ -26,9 +26,7 @@ import ( "vitess.io/vitess/go/vt/log" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" - "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vterrors" ) type BranchJob struct { @@ -295,11 +293,7 @@ func GetBranchJobByWorkflow(ctx context.Context, workflow string, wr *Wrangler) return nil, err } onddl := branchJobMap["onddl"].ToString() - status := branchJobMap["status"].ToString() - if status != BranchStatusOfPrePare { - return nil, vterrors.Errorf(vtrpc.Code_ABORTED, "can not start an branch which status [%v] is not prepare", status) - } branchJob := &BranchJob{ sourceDatabase: sourceDatabase, targetDatabase: targetDatabase,