Skip to content

Commit

Permalink
feat: add branch status monitor
Browse files Browse the repository at this point in the history
Signed-off-by: <[email protected]>
  • Loading branch information
gerayking committed Dec 21, 2023
1 parent 37d75cb commit 6697f02
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 7 deletions.
2 changes: 1 addition & 1 deletion examples/workflow/Branch/prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ source "$(dirname "${BASH_SOURCE[0]:-$0}")/../../common/env.sh"
USER_COUNT=5000
CUSTOMER_COUNT=5000
PRODUCT_COUNT=5000
CORDER_COUNT=10000
CORDER_COUNT=200000

mysql -h127.0.0.1 -P15306 -e 'create database if not exists branch_source'

Expand Down
141 changes: 141 additions & 0 deletions go/vt/vttablet/tabletserver/branch_watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright ApeCloud, Inc.
Licensed under the Apache v2(found in the LICENSE file in the root directory).
*/

package tabletserver

import (
"context"
"strings"
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

const SeleteWorkflowNameFromBranchJobs = "SELECT * from mysql.branch_jobs;"

const SeleteVReplicationByWorkflow = "SELECT * from mysql.vreplication where workflow=%a;"

const UpdateBranchJobStatusByWorkflow = "update mysql.branch_jobs set status=%a,message=%a where workflow_name=%a"

const (
BranchStateOfPrepare = "prepare"
BranchStateOfRunning = "running"
BranchStateOfStop = "Stop"
BranchStateOfCompleted = "completed"
BranchStateOfError = "error"
)

const UpdateInterval = 5 * time.Second

type BranchWatcher struct {
dbConfig dbconfigs.Connector
conns *connpool.Pool

ticker *time.Ticker
updateInterval time.Duration
running bool
}

func NewBranchWatcher(env tabletenv.Env, dbConfig dbconfigs.Connector) *BranchWatcher {
branchWatcher := BranchWatcher{
running: false,
ticker: time.NewTicker(UpdateInterval),
updateInterval: UpdateInterval,
dbConfig: dbConfig,
}
branchWatcher.conns = connpool.NewPool(env, "", tabletenv.ConnPoolConfig{
Size: 2,
IdleTimeoutSeconds: env.Config().OltpReadPool.IdleTimeoutSeconds,
})
return &branchWatcher
}

func (b *BranchWatcher) updateBranchState(ctx context.Context, conn *connpool.DBConn, state string, message, workflow string) error {
query, err := sqlparser.ParseAndBind(UpdateBranchJobStatusByWorkflow,
sqltypes.StringBindVariable(state),
sqltypes.StringBindVariable(message),
sqltypes.StringBindVariable(workflow))
if err != nil {
return err
}
_, err = conn.Exec(ctx, query, -1, false)
if err != nil {
return err
}
return nil
}

func (b *BranchWatcher) watch() error {
ctx := context.Background()
conn, err := b.conns.Get(ctx, nil)
if err != nil {
return err
}
defer conn.Recycle()
qr, err := conn.Exec(ctx, SeleteWorkflowNameFromBranchJobs, -1, true)
if err != nil {
return err
}
for _, row := range qr.Named().Rows {
workflow := row["workflow_name"].ToString()
query, err := sqlparser.ParseAndBind(SeleteVReplicationByWorkflow, sqltypes.StringBindVariable(workflow))
if err != nil {
return err
}
vreplication, err := conn.Exec(ctx, query, -1, true)
rows := vreplication.Named().Row()
vState := rows["state"].ToString()
message := rows["message"].ToString()
if err != nil {
return err
}
switch vState {
case "Stopped":
if strings.Contains(message, "Stopped after copy") {
err = b.updateBranchState(ctx, conn, BranchStateOfCompleted, message, workflow)
if err != nil {
return err
}
} else {
err = b.updateBranchState(ctx, conn, BranchStateOfError, message, workflow)
if err != nil {
return err
}
}
case "Error":
err = b.updateBranchState(ctx, conn, BranchStateOfError, message, workflow)
if err != nil {
return err
}
case "Copying":
err = b.updateBranchState(ctx, conn, BranchStateOfRunning, message, workflow)
if err != nil {
return err
}
}
}
return nil
}

func (b *BranchWatcher) Open() {
b.conns.Open(b.dbConfig, b.dbConfig, b.dbConfig)
b.ticker.Reset(b.updateInterval)
if !b.running {
go func() {
for range b.ticker.C {
b.watch()
}
}()
b.running = true
}
}
func (b *BranchWatcher) Close() {
b.conns.Close()
b.ticker.Stop()
}
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type stateManager struct {
vstreamer subComponent
tracker subComponent
watcher subComponent
branchWatch subComponent
qe queryEngine
txThrottler txThrottler
te txEngine
Expand Down Expand Up @@ -475,6 +476,7 @@ func (sm *stateManager) servePrimary() error {
sm.tableGC.Open()
sm.ddle.Open()
sm.tableACL.Open()
sm.branchWatch.Open()
sm.setState(topodatapb.TabletType_PRIMARY, StateServing)
return nil
}
Expand Down Expand Up @@ -503,6 +505,7 @@ func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) er
sm.tableGC.Close()
sm.messager.Close()
sm.tracker.Close()
sm.branchWatch.Close()
sm.se.MakeNonPrimary()

if err := sm.connect(wantTabletType); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type TabletServer struct {
hs *healthStreamer
lagThrottler *throttle.Throttler
tableGC *gc.TableGC
branchWatch *BranchWatcher

// sm manages state transitions.
sm *stateManager
Expand Down Expand Up @@ -189,6 +190,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to
tsv.txThrottler = txthrottler.NewTxThrottler(tsv.config, topoServer)
tsv.te = NewTxEngine(tsv)
tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer)
tsv.branchWatch = NewBranchWatcher(tsv, tsv.config.DB.DbaWithDB())

tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer)
tsv.tableGC = gc.NewTableGC(tsv, topoServer, tsv.lagThrottler)
Expand All @@ -203,6 +205,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to
vstreamer: tsv.vstreamer,
tracker: tsv.tracker,
watcher: tsv.watcher,
branchWatch: tsv.branchWatch,
qe: tsv.qe,
txThrottler: tsv.txThrottler,
te: tsv.te,
Expand Down
6 changes: 0 additions & 6 deletions go/vt/wrangler/branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ import (
"vitess.io/vitess/go/vt/log"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
)

type BranchJob struct {
Expand Down Expand Up @@ -295,11 +293,7 @@ func GetBranchJobByWorkflow(ctx context.Context, workflow string, wr *Wrangler)
return nil, err
}
onddl := branchJobMap["onddl"].ToString()

status := branchJobMap["status"].ToString()
if status != BranchStatusOfPrePare {
return nil, vterrors.Errorf(vtrpc.Code_ABORTED, "can not start an branch which status [%v] is not prepare", status)
}
branchJob := &BranchJob{
sourceDatabase: sourceDatabase,
targetDatabase: targetDatabase,
Expand Down

0 comments on commit 6697f02

Please sign in to comment.