Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add branch status monitor #395

Merged
merged 3 commits into from
Jan 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/workflow/Branch/prepare.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ source "$(dirname "${BASH_SOURCE[0]:-$0}")/../../common/env.sh"
USER_COUNT=5000
CUSTOMER_COUNT=5000
PRODUCT_COUNT=5000
CORDER_COUNT=10000
CORDER_COUNT=200000

mysql -h127.0.0.1 -P15306 -e 'create database if not exists branch_source'

Expand Down
125 changes: 122 additions & 3 deletions go/test/endtoend/branch/branch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ import (
"vitess.io/vitess/go/mysql"
)

type BranchStatus string

const (
BranchStateOfPrepare = "Prepare"
BranchStateOfRunning = "Running"
BranchStateOfStop = "Stop"
BranchStateOfCompleted = "Completed"
BranchStateOfError = "Error"
)

func insertUsers(dbConn *mysql.Conn, end int) error {
start := 1
batchSize := 1000
Expand Down Expand Up @@ -265,7 +275,7 @@ func checkStateOfVreplication(t *testing.T, uuid, expectState string) {
}

func WaitForVreplicationState(t *testing.T, vtParams *mysql.ConnParams, workflow string, timeout time.Duration, expectStates ...string) string {
query, err := sqlparser.ParseAndBind("select state from mysql.vreplication where workflow=%a",
query, err := sqlparser.ParseAndBind("select * from mysql.vreplication where workflow=%a",
sqltypes.StringBindVariable(workflow),
)
require.NoError(t, err)
Expand All @@ -279,7 +289,7 @@ func WaitForVreplicationState(t *testing.T, vtParams *mysql.ConnParams, workflow
r := onlineddl.VtgateExecQuery(t, vtParams, query, "")
for _, row := range r.Named().Rows {
lastKnownVreplicationState = row["state"].ToString()

t.Logf("message : %v", row["message"].ToString())
if statesMap[lastKnownVreplicationState] {
return lastKnownVreplicationState
}
Expand Down Expand Up @@ -315,7 +325,30 @@ func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []clu
assert.Equal(t, len(shards), count)
}

// WaitForMigrationStatus waits for a migration to reach either provided statuses (returns immediately), or eventually time out
func CheckBranchStatus(t *testing.T, vtParams *mysql.ConnParams, workflow string, expectStatuses ...BranchStatus) {
query, err := sqlparser.ParseAndBind("select * from mysql.branch_jobs where workflow_name=%a",
sqltypes.StringBindVariable(workflow),
)
require.NoError(t, err)

r := VtgateExecQuery(t, vtParams, query, "")
fmt.Printf("# output for `%s`:\n", query)

count := 0
for _, row := range r.Named().Rows {
if row["workflow_name"].ToString() != workflow {
continue
}
for _, expectStatus := range expectStatuses {
if row["status"].ToString() == string(expectStatus) {
count++
break
}
}
}
assert.Equal(t, 1, count)
}

func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
shardNames := map[string]bool{}
for _, shard := range shards {
Expand Down Expand Up @@ -358,6 +391,45 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c
return schema.OnlineDDLStatus(lastKnownStatus)
}

// WaitForBranchStatus waits for a branch to reach either provided statuses (returns immediately), or eventually time out
func WaitForBranchStatus(t *testing.T, vtParams *mysql.ConnParams, workflow string, timeout time.Duration, expectStatuses ...BranchStatus) BranchStatus {
query, err := sqlparser.ParseAndBind("select * from mysql.branch_jobs where workflow_name=%a",
sqltypes.StringBindVariable(workflow),
)
require.NoError(t, err)

statusesMap := map[string]bool{}
for _, status := range expectStatuses {
statusesMap[string(status)] = true
}
startTime := time.Now()
lastKnownStatus := ""
for time.Since(startTime) < timeout {
countMatchedShards := 0
r := VtgateExecQuery(t, vtParams, query, "")
for _, row := range r.Named().Rows {
workflowName := row["workflow_name"].ToString()
if workflow != workflowName {
// irrelevant shard
continue
}
lastKnownStatus = row["status"].ToString()
message := row["message"].ToString()
if lastKnownStatus == BranchStateOfError {
t.Logf("schemaMigration fail, message : %v", message)
}
if row["migration_uuid"].ToString() == workflow && statusesMap[lastKnownStatus] {
countMatchedShards++
}
}
if countMatchedShards == 1 {
return BranchStatus(lastKnownStatus)
}
time.Sleep(1 * time.Second)
}
return BranchStatus(lastKnownStatus)
}

func TestBranchGoFakeitFunction(t *testing.T) {
workflowName := "branch_test"
t.Run("prepare branch", func(t *testing.T) {
Expand Down Expand Up @@ -408,6 +480,16 @@ func TestBranchGoFakeitFunction(t *testing.T) {
func TestBranchMergeBack(t *testing.T) {
workflowName := "TestBranchMergeBack"
defer func() {
ctx := context.Background()
branchSourceParams := mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
DbName: "branch_source",
}
conn, err := mysql.Connect(ctx, &branchSourceParams)
require.Nil(t, err)
_, err = conn.ExecuteFetch("DROP TABLE IF EXISTS news_table;\n", -1, false)
require.Nil(t, err)
CleanupDatabase(t)
clusterInstance.VtctlclientProcess.Cleanupbranch(workflowName)
}()
Expand Down Expand Up @@ -517,3 +599,40 @@ func TestPrepareBranch(t *testing.T) {
require.True(t, strings.HasPrefix(output, "successfully"))
defer CleanupDatabase(t)
}

func TestBranchWatcher(t *testing.T) {
workflowName := "TestBranchWatcher"
defer func() {
CleanupDatabase(t)
clusterInstance.VtctlclientProcess.Cleanupbranch(workflowName)
}()
t.Run("prepare branch", func(t *testing.T) {
output, err := clusterInstance.VtctlclientProcess.PrepareBranch(workflowName, "branch_source", "branch_target", "", "", "", "", false, "RAND()<0.1", false)
require.Nil(t, err)
require.True(t, strings.HasPrefix(output, "successfully"))
time.Sleep(2 * time.Second)
CheckBranchStatus(t, &vtParams, workflowName, BranchStateOfPrepare)
})
t.Run("update filterling rules", func(t *testing.T) {
VtgateExecQuery(t, &vtParams, `update mysql.branch_table_rules set filtering_rule='select id, gofakeit_generate(\'{firstname}:###:???:{moviename}\') as name from user WHERE id<=100' where source_table_name = 'user';`, "")
VtgateExecQuery(t, &vtParams, `update mysql.branch_table_rules set filtering_rule='select customer_id, gofakeit_bytype(\'regex\',\'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$\') as email from customer WHERE customer_id<=100' where source_table_name = 'customer';`, "")
VtgateExecQuery(t, &vtParams, `update mysql.branch_table_rules set filtering_rule='select sku,description,gofakeit_bytype(\'intrange\',110,150) as price,gofakeit_bytype(\'floatrange\',23.5,23.9) as weight from product' where source_table_name = 'product';`, "")
VtgateExecQuery(t, &vtParams, `update mysql.branch_table_rules set filtering_rule='SELECT order_id,gofakeit_bytype(\'bigint\') as customer_id,gofakeit_generate(\'{firstname}:###:???:{moviename}\') as sku,gofakeit_bytype(\'bigint\') as price FROM corder where customer_id<=100' where source_table_name = 'corder';`, "")
})
t.Run("start branch", func(t *testing.T) {
output, err := clusterInstance.VtctlclientProcess.StartBranch(workflowName)
require.Nil(t, err)
require.True(t, strings.HasSuffix(output, "successfully."))
RequireVRplicationExist(t, workflowName)
clusterInstance.VtctlclientProcess.StopBranch(workflowName)
time.Sleep(2 * time.Second)
CheckBranchStatus(t, &vtParams, workflowName, BranchStateOfStop)
// start again
output, err = clusterInstance.VtctlclientProcess.StartBranch(workflowName)
require.Nil(t, err)
require.True(t, strings.HasSuffix(output, "successfully."))
WaitForVreplicationState(t, &vtParams, workflowName, 5*time.Second, "Stopped")
time.Sleep(3 * time.Second)
CheckBranchStatus(t, &vtParams, workflowName, BranchStateOfCompleted)
})
}
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (vtctlclient *VtctlClientProcess) PrepareBranch(workflow, sourceDatabase, t
args = append(args, "--stop_after_copy")
}
if defaultFilterRules != "" {
args = append(args, "--default_filter_rules", fmt.Sprintf("\"%v\"", defaultFilterRules))
args = append(args, "--default_filter_rules", fmt.Sprintf("%v", defaultFilterRules))
}
if !skipCopyPhase {
args = append(args, "--skip_copy_phase=false")
Expand Down
141 changes: 141 additions & 0 deletions go/vt/vttablet/tabletserver/branch_watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright ApeCloud, Inc.
Licensed under the Apache v2(found in the LICENSE file in the root directory).
*/

package tabletserver

import (
"context"
"strings"
"time"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)

const SeleteWorkflowNameFromBranchJobs = "SELECT * from mysql.branch_jobs;"

const SeleteVReplicationByWorkflow = "SELECT * from mysql.vreplication where workflow=%a;"

const UpdateBranchJobStatusByWorkflow = "update mysql.branch_jobs set status=%a,message=%a where workflow_name=%a"

const (
BranchStateOfPrepare = "Prepare"
BranchStateOfRunning = "Running"
BranchStateOfStop = "Stop"
BranchStateOfCompleted = "Completed"
BranchStateOfError = "Error"
)

const UpdateInterval = 2 * time.Second

type BranchWatcher struct {
dbConfig dbconfigs.Connector
conns *connpool.Pool

ticker *time.Ticker
updateInterval time.Duration
running bool
}

func NewBranchWatcher(env tabletenv.Env, dbConfig dbconfigs.Connector) *BranchWatcher {
branchWatcher := BranchWatcher{
running: false,
ticker: time.NewTicker(UpdateInterval),
updateInterval: UpdateInterval,
dbConfig: dbConfig,
}
branchWatcher.conns = connpool.NewPool(env, "", tabletenv.ConnPoolConfig{
Size: 2,
IdleTimeoutSeconds: env.Config().OltpReadPool.IdleTimeoutSeconds,
})
return &branchWatcher
}

func (b *BranchWatcher) updateBranchState(ctx context.Context, conn *connpool.DBConn, state string, message, workflow string) error {
query, err := sqlparser.ParseAndBind(UpdateBranchJobStatusByWorkflow,
sqltypes.StringBindVariable(state),
sqltypes.StringBindVariable(message),
sqltypes.StringBindVariable(workflow))
if err != nil {
return err
}
_, err = conn.Exec(ctx, query, -1, false)
if err != nil {
return err
}
return nil
}

func (b *BranchWatcher) watch() error {
ctx := context.Background()
conn, err := b.conns.Get(ctx, nil)
if err != nil {
return err
}
defer conn.Recycle()
qr, err := conn.Exec(ctx, SeleteWorkflowNameFromBranchJobs, -1, true)
if err != nil {
return err
}
for _, row := range qr.Named().Rows {
workflow := row["workflow_name"].ToString()
query, err := sqlparser.ParseAndBind(SeleteVReplicationByWorkflow, sqltypes.StringBindVariable(workflow))
if err != nil {
return err
}
vreplication, err := conn.Exec(ctx, query, -1, true)
rows := vreplication.Named().Row()
vState := rows["state"].ToString()
message := rows["message"].ToString()
if err != nil {
return err
}
switch vState {
case "Stopped":
if strings.Contains(message, "Stopped after copy") {
err = b.updateBranchState(ctx, conn, BranchStateOfCompleted, message, workflow)
if err != nil {
return err
}
} else {
err = b.updateBranchState(ctx, conn, BranchStateOfStop, message, workflow)
if err != nil {
return err
}
}
case "Error":
err = b.updateBranchState(ctx, conn, BranchStateOfError, message, workflow)
if err != nil {
return err
}
case "Copying":
err = b.updateBranchState(ctx, conn, BranchStateOfRunning, message, workflow)
if err != nil {
return err
}
}
}
return nil
}

func (b *BranchWatcher) Open() {
b.conns.Open(b.dbConfig, b.dbConfig, b.dbConfig)
b.ticker.Reset(b.updateInterval)
if !b.running {
go func() {
for range b.ticker.C {
b.watch()
}
}()
b.running = true
}
}
func (b *BranchWatcher) Close() {
b.conns.Close()
b.ticker.Stop()
}
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ type stateManager struct {
vstreamer subComponent
tracker subComponent
watcher subComponent
branchWatch subComponent
qe queryEngine
txThrottler txThrottler
te txEngine
Expand Down Expand Up @@ -475,6 +476,7 @@ func (sm *stateManager) servePrimary() error {
sm.tableGC.Open()
sm.ddle.Open()
sm.tableACL.Open()
sm.branchWatch.Open()
sm.setState(topodatapb.TabletType_PRIMARY, StateServing)
return nil
}
Expand Down Expand Up @@ -503,6 +505,7 @@ func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) er
sm.tableGC.Close()
sm.messager.Close()
sm.tracker.Close()
sm.branchWatch.Close()
sm.se.MakeNonPrimary()

if err := sm.connect(wantTabletType); err != nil {
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type TabletServer struct {
hs *healthStreamer
lagThrottler *throttle.Throttler
tableGC *gc.TableGC
branchWatch *BranchWatcher

// sm manages state transitions.
sm *stateManager
Expand Down Expand Up @@ -189,6 +190,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to
tsv.txThrottler = txthrottler.NewTxThrottler(tsv.config, topoServer)
tsv.te = NewTxEngine(tsv)
tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer)
tsv.branchWatch = NewBranchWatcher(tsv, tsv.config.DB.DbaWithDB())

tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer)
tsv.tableGC = gc.NewTableGC(tsv, topoServer, tsv.lagThrottler)
Expand All @@ -203,6 +205,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to
vstreamer: tsv.vstreamer,
tracker: tsv.tracker,
watcher: tsv.watcher,
branchWatch: tsv.branchWatch,
qe: tsv.qe,
txThrottler: tsv.txThrottler,
te: tsv.te,
Expand Down
Loading
Loading