From 3b9bc49960c51ea97e291dc26b6e9eba21b9db26 Mon Sep 17 00:00:00 2001 From: geray <919179287@qq.com> Date: Thu, 21 Dec 2023 17:07:55 +0800 Subject: [PATCH 1/3] feat: add branch status monitor Signed-off-by: <919179287@qq.com> --- examples/workflow/Branch/prepare.sh | 2 +- go/vt/vttablet/tabletserver/branch_watch.go | 141 +++++++++++++++++++ go/vt/vttablet/tabletserver/state_manager.go | 3 + go/vt/vttablet/tabletserver/tabletserver.go | 3 + go/vt/wrangler/branch.go | 6 - 5 files changed, 148 insertions(+), 7 deletions(-) create mode 100644 go/vt/vttablet/tabletserver/branch_watch.go diff --git a/examples/workflow/Branch/prepare.sh b/examples/workflow/Branch/prepare.sh index 5927009872..80abc3e955 100755 --- a/examples/workflow/Branch/prepare.sh +++ b/examples/workflow/Branch/prepare.sh @@ -6,7 +6,7 @@ source "$(dirname "${BASH_SOURCE[0]:-$0}")/../../common/env.sh" USER_COUNT=5000 CUSTOMER_COUNT=5000 PRODUCT_COUNT=5000 -CORDER_COUNT=10000 +CORDER_COUNT=200000 mysql -h127.0.0.1 -P15306 -e 'create database if not exists branch_source' diff --git a/go/vt/vttablet/tabletserver/branch_watch.go b/go/vt/vttablet/tabletserver/branch_watch.go new file mode 100644 index 0000000000..4c4d764fb9 --- /dev/null +++ b/go/vt/vttablet/tabletserver/branch_watch.go @@ -0,0 +1,141 @@ +/* +Copyright ApeCloud, Inc. +Licensed under the Apache v2(found in the LICENSE file in the root directory). +*/ + +package tabletserver + +import ( + "context" + "strings" + "time" + + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" +) + +const SeleteWorkflowNameFromBranchJobs = "SELECT * from mysql.branch_jobs;" + +const SeleteVReplicationByWorkflow = "SELECT * from mysql.vreplication where workflow=%a;" + +const UpdateBranchJobStatusByWorkflow = "update mysql.branch_jobs set status=%a,message=%a where workflow_name=%a" + +const ( + BranchStateOfPrepare = "prepare" + BranchStateOfRunning = "running" + BranchStateOfStop = "Stop" + BranchStateOfCompleted = "completed" + BranchStateOfError = "error" +) + +const UpdateInterval = 5 * time.Second + +type BranchWatcher struct { + dbConfig dbconfigs.Connector + conns *connpool.Pool + + ticker *time.Ticker + updateInterval time.Duration + running bool +} + +func NewBranchWatcher(env tabletenv.Env, dbConfig dbconfigs.Connector) *BranchWatcher { + branchWatcher := BranchWatcher{ + running: false, + ticker: time.NewTicker(UpdateInterval), + updateInterval: UpdateInterval, + dbConfig: dbConfig, + } + branchWatcher.conns = connpool.NewPool(env, "", tabletenv.ConnPoolConfig{ + Size: 2, + IdleTimeoutSeconds: env.Config().OltpReadPool.IdleTimeoutSeconds, + }) + return &branchWatcher +} + +func (b *BranchWatcher) updateBranchState(ctx context.Context, conn *connpool.DBConn, state string, message, workflow string) error { + query, err := sqlparser.ParseAndBind(UpdateBranchJobStatusByWorkflow, + sqltypes.StringBindVariable(state), + sqltypes.StringBindVariable(message), + sqltypes.StringBindVariable(workflow)) + if err != nil { + return err + } + _, err = conn.Exec(ctx, query, -1, false) + if err != nil { + return err + } + return nil +} + +func (b *BranchWatcher) watch() error { + ctx := context.Background() + conn, err := b.conns.Get(ctx, nil) + if err != nil { + return err + } + defer conn.Recycle() + qr, err := conn.Exec(ctx, SeleteWorkflowNameFromBranchJobs, -1, true) + if err != nil { + return err + } + for _, row := range qr.Named().Rows { + workflow := row["workflow_name"].ToString() + query, err := sqlparser.ParseAndBind(SeleteVReplicationByWorkflow, sqltypes.StringBindVariable(workflow)) + if err != nil { + return err + } + vreplication, err := conn.Exec(ctx, query, -1, true) + rows := vreplication.Named().Row() + vState := rows["state"].ToString() + message := rows["message"].ToString() + if err != nil { + return err + } + switch vState { + case "Stopped": + if strings.Contains(message, "Stopped after copy") { + err = b.updateBranchState(ctx, conn, BranchStateOfCompleted, message, workflow) + if err != nil { + return err + } + } else { + err = b.updateBranchState(ctx, conn, BranchStateOfError, message, workflow) + if err != nil { + return err + } + } + case "Error": + err = b.updateBranchState(ctx, conn, BranchStateOfError, message, workflow) + if err != nil { + return err + } + case "Copying": + err = b.updateBranchState(ctx, conn, BranchStateOfRunning, message, workflow) + if err != nil { + return err + } + } + } + return nil +} + +func (b *BranchWatcher) Open() { + b.conns.Open(b.dbConfig, b.dbConfig, b.dbConfig) + b.ticker.Reset(b.updateInterval) + if !b.running { + go func() { + for range b.ticker.C { + b.watch() + } + }() + b.running = true + } +} +func (b *BranchWatcher) Close() { + b.conns.Close() + b.ticker.Stop() +} diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index e439a31708..a12fbfe12b 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -121,6 +121,7 @@ type stateManager struct { vstreamer subComponent tracker subComponent watcher subComponent + branchWatch subComponent qe queryEngine txThrottler txThrottler te txEngine @@ -475,6 +476,7 @@ func (sm *stateManager) servePrimary() error { sm.tableGC.Open() sm.ddle.Open() sm.tableACL.Open() + sm.branchWatch.Open() sm.setState(topodatapb.TabletType_PRIMARY, StateServing) return nil } @@ -503,6 +505,7 @@ func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) er sm.tableGC.Close() sm.messager.Close() sm.tracker.Close() + sm.branchWatch.Close() sm.se.MakeNonPrimary() if err := sm.connect(wantTabletType); err != nil { diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index ea8b12f5c5..3b4ee8a7ca 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -122,6 +122,7 @@ type TabletServer struct { hs *healthStreamer lagThrottler *throttle.Throttler tableGC *gc.TableGC + branchWatch *BranchWatcher // sm manages state transitions. sm *stateManager @@ -189,6 +190,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to tsv.txThrottler = txthrottler.NewTxThrottler(tsv.config, topoServer) tsv.te = NewTxEngine(tsv) tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer) + tsv.branchWatch = NewBranchWatcher(tsv, tsv.config.DB.DbaWithDB()) tsv.onlineDDLExecutor = onlineddl.NewExecutor(tsv, alias, topoServer, tsv.lagThrottler, tabletTypeFunc, tsv.onlineDDLExecutorToggleTableBuffer) tsv.tableGC = gc.NewTableGC(tsv, topoServer, tsv.lagThrottler) @@ -203,6 +205,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to vstreamer: tsv.vstreamer, tracker: tsv.tracker, watcher: tsv.watcher, + branchWatch: tsv.branchWatch, qe: tsv.qe, txThrottler: tsv.txThrottler, te: tsv.te, diff --git a/go/vt/wrangler/branch.go b/go/vt/wrangler/branch.go index 8b61c57191..99bff946b8 100644 --- a/go/vt/wrangler/branch.go +++ b/go/vt/wrangler/branch.go @@ -26,9 +26,7 @@ import ( "vitess.io/vitess/go/vt/log" vschemapb "vitess.io/vitess/go/vt/proto/vschema" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" - "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vterrors" ) type BranchJob struct { @@ -295,11 +293,7 @@ func GetBranchJobByWorkflow(ctx context.Context, workflow string, wr *Wrangler) return nil, err } onddl := branchJobMap["onddl"].ToString() - status := branchJobMap["status"].ToString() - if status != BranchStatusOfPrePare { - return nil, vterrors.Errorf(vtrpc.Code_ABORTED, "can not start an branch which status [%v] is not prepare", status) - } branchJob := &BranchJob{ sourceDatabase: sourceDatabase, targetDatabase: targetDatabase, From cb654ff6f4bd84872c9ca67ccf9315bf8fd8c786 Mon Sep 17 00:00:00 2001 From: geray <919179287@qq.com> Date: Fri, 22 Dec 2023 20:18:12 +0800 Subject: [PATCH 2/3] test: add branch monitor testcase Signed-off-by: <919179287@qq.com> --- go/test/endtoend/branch/branch_test.go | 125 +++++++++++++++++- .../endtoend/cluster/vtctlclient_process.go | 2 +- go/vt/vttablet/tabletserver/branch_watch.go | 12 +- .../tabletserver/vstreamer/planbuilder.go | 4 +- 4 files changed, 131 insertions(+), 12 deletions(-) diff --git a/go/test/endtoend/branch/branch_test.go b/go/test/endtoend/branch/branch_test.go index fd16758803..ab1da6d3e9 100644 --- a/go/test/endtoend/branch/branch_test.go +++ b/go/test/endtoend/branch/branch_test.go @@ -29,6 +29,16 @@ import ( "vitess.io/vitess/go/mysql" ) +type BranchStatus string + +const ( + BranchStateOfPrepare = "Prepare" + BranchStateOfRunning = "Running" + BranchStateOfStop = "Stop" + BranchStateOfCompleted = "Completed" + BranchStateOfError = "Error" +) + func insertUsers(dbConn *mysql.Conn, end int) error { start := 1 batchSize := 1000 @@ -265,7 +275,7 @@ func checkStateOfVreplication(t *testing.T, uuid, expectState string) { } func WaitForVreplicationState(t *testing.T, vtParams *mysql.ConnParams, workflow string, timeout time.Duration, expectStates ...string) string { - query, err := sqlparser.ParseAndBind("select state from mysql.vreplication where workflow=%a", + query, err := sqlparser.ParseAndBind("select * from mysql.vreplication where workflow=%a", sqltypes.StringBindVariable(workflow), ) require.NoError(t, err) @@ -279,7 +289,7 @@ func WaitForVreplicationState(t *testing.T, vtParams *mysql.ConnParams, workflow r := onlineddl.VtgateExecQuery(t, vtParams, query, "") for _, row := range r.Named().Rows { lastKnownVreplicationState = row["state"].ToString() - + t.Logf("message : %v", row["message"].ToString()) if statesMap[lastKnownVreplicationState] { return lastKnownVreplicationState } @@ -315,7 +325,30 @@ func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []clu assert.Equal(t, len(shards), count) } -// WaitForMigrationStatus waits for a migration to reach either provided statuses (returns immediately), or eventually time out +func CheckBranchStatus(t *testing.T, vtParams *mysql.ConnParams, workflow string, expectStatuses ...BranchStatus) { + query, err := sqlparser.ParseAndBind("select * from mysql.branch_jobs where workflow_name=%a", + sqltypes.StringBindVariable(workflow), + ) + require.NoError(t, err) + + r := VtgateExecQuery(t, vtParams, query, "") + fmt.Printf("# output for `%s`:\n", query) + + count := 0 + for _, row := range r.Named().Rows { + if row["workflow_name"].ToString() != workflow { + continue + } + for _, expectStatus := range expectStatuses { + if row["status"].ToString() == string(expectStatus) { + count++ + break + } + } + } + assert.Equal(t, 1, count) +} + func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus { shardNames := map[string]bool{} for _, shard := range shards { @@ -358,6 +391,45 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c return schema.OnlineDDLStatus(lastKnownStatus) } +// WaitForBranchStatus waits for a branch to reach either provided statuses (returns immediately), or eventually time out +func WaitForBranchStatus(t *testing.T, vtParams *mysql.ConnParams, workflow string, timeout time.Duration, expectStatuses ...BranchStatus) BranchStatus { + query, err := sqlparser.ParseAndBind("select * from mysql.branch_jobs where workflow_name=%a", + sqltypes.StringBindVariable(workflow), + ) + require.NoError(t, err) + + statusesMap := map[string]bool{} + for _, status := range expectStatuses { + statusesMap[string(status)] = true + } + startTime := time.Now() + lastKnownStatus := "" + for time.Since(startTime) < timeout { + countMatchedShards := 0 + r := VtgateExecQuery(t, vtParams, query, "") + for _, row := range r.Named().Rows { + workflowName := row["workflow_name"].ToString() + if workflow != workflowName { + // irrelevant shard + continue + } + lastKnownStatus = row["status"].ToString() + message := row["message"].ToString() + if lastKnownStatus == BranchStateOfError { + t.Logf("schemaMigration fail, message : %v", message) + } + if row["migration_uuid"].ToString() == workflow && statusesMap[lastKnownStatus] { + countMatchedShards++ + } + } + if countMatchedShards == 1 { + return BranchStatus(lastKnownStatus) + } + time.Sleep(1 * time.Second) + } + return BranchStatus(lastKnownStatus) +} + func TestBranchGoFakeitFunction(t *testing.T) { workflowName := "branch_test" t.Run("prepare branch", func(t *testing.T) { @@ -408,6 +480,16 @@ func TestBranchGoFakeitFunction(t *testing.T) { func TestBranchMergeBack(t *testing.T) { workflowName := "TestBranchMergeBack" defer func() { + ctx := context.Background() + branchSourceParams := mysql.ConnParams{ + Host: clusterInstance.Hostname, + Port: clusterInstance.VtgateMySQLPort, + DbName: "branch_source", + } + conn, err := mysql.Connect(ctx, &branchSourceParams) + require.Nil(t, err) + _, err = conn.ExecuteFetch("DROP TABLE IF EXISTS news_table;\n", -1, false) + require.Nil(t, err) CleanupDatabase(t) clusterInstance.VtctlclientProcess.Cleanupbranch(workflowName) }() @@ -517,3 +599,40 @@ func TestPrepareBranch(t *testing.T) { require.True(t, strings.HasPrefix(output, "successfully")) defer CleanupDatabase(t) } + +func TestBranchWatcher(t *testing.T) { + workflowName := "TestBranchWatcher" + defer func() { + CleanupDatabase(t) + clusterInstance.VtctlclientProcess.Cleanupbranch(workflowName) + }() + t.Run("prepare branch", func(t *testing.T) { + output, err := clusterInstance.VtctlclientProcess.PrepareBranch(workflowName, "branch_source", "branch_target", "", "", "", "", false, "RAND()<0.1", false) + require.Nil(t, err) + require.True(t, strings.HasPrefix(output, "successfully")) + time.Sleep(2 * time.Second) + CheckBranchStatus(t, &vtParams, workflowName, BranchStateOfPrepare) + }) + t.Run("update filterling rules", func(t *testing.T) { + VtgateExecQuery(t, &vtParams, `update mysql.branch_table_rules set filtering_rule='select id, gofakeit_generate(\'{firstname}:###:???:{moviename}\') as name from user WHERE id<=100' where source_table_name = 'user';`, "") + VtgateExecQuery(t, &vtParams, `update mysql.branch_table_rules set filtering_rule='select customer_id, gofakeit_bytype(\'regex\',\'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$\') as email from customer WHERE customer_id<=100' where source_table_name = 'customer';`, "") + VtgateExecQuery(t, &vtParams, `update mysql.branch_table_rules set filtering_rule='select sku,description,gofakeit_bytype(\'intrange\',110,150) as price,gofakeit_bytype(\'floatrange\',23.5,23.9) as weight from product' where source_table_name = 'product';`, "") + VtgateExecQuery(t, &vtParams, `update mysql.branch_table_rules set filtering_rule='SELECT order_id,gofakeit_bytype(\'bigint\') as customer_id,gofakeit_generate(\'{firstname}:###:???:{moviename}\') as sku,gofakeit_bytype(\'bigint\') as price FROM corder where customer_id<=100' where source_table_name = 'corder';`, "") + }) + t.Run("start branch", func(t *testing.T) { + output, err := clusterInstance.VtctlclientProcess.StartBranch(workflowName) + require.Nil(t, err) + require.True(t, strings.HasSuffix(output, "successfully.")) + RequireVRplicationExist(t, workflowName) + clusterInstance.VtctlclientProcess.StopBranch(workflowName) + time.Sleep(2 * time.Second) + CheckBranchStatus(t, &vtParams, workflowName, BranchStateOfStop) + // start again + output, err = clusterInstance.VtctlclientProcess.StartBranch(workflowName) + require.Nil(t, err) + require.True(t, strings.HasSuffix(output, "successfully.")) + WaitForVreplicationState(t, &vtParams, workflowName, 5*time.Second, "Stopped") + time.Sleep(3 * time.Second) + CheckBranchStatus(t, &vtParams, workflowName, BranchStateOfCompleted) + }) +} diff --git a/go/test/endtoend/cluster/vtctlclient_process.go b/go/test/endtoend/cluster/vtctlclient_process.go index 39a4d64aca..919582b65b 100644 --- a/go/test/endtoend/cluster/vtctlclient_process.go +++ b/go/test/endtoend/cluster/vtctlclient_process.go @@ -158,7 +158,7 @@ func (vtctlclient *VtctlClientProcess) PrepareBranch(workflow, sourceDatabase, t args = append(args, "--stop_after_copy") } if defaultFilterRules != "" { - args = append(args, "--default_filter_rules", fmt.Sprintf("\"%v\"", defaultFilterRules)) + args = append(args, "--default_filter_rules", fmt.Sprintf("%v", defaultFilterRules)) } if !skipCopyPhase { args = append(args, "--skip_copy_phase=false") diff --git a/go/vt/vttablet/tabletserver/branch_watch.go b/go/vt/vttablet/tabletserver/branch_watch.go index 4c4d764fb9..a1a434dbf3 100644 --- a/go/vt/vttablet/tabletserver/branch_watch.go +++ b/go/vt/vttablet/tabletserver/branch_watch.go @@ -24,14 +24,14 @@ const SeleteVReplicationByWorkflow = "SELECT * from mysql.vreplication where wor const UpdateBranchJobStatusByWorkflow = "update mysql.branch_jobs set status=%a,message=%a where workflow_name=%a" const ( - BranchStateOfPrepare = "prepare" - BranchStateOfRunning = "running" + BranchStateOfPrepare = "Prepare" + BranchStateOfRunning = "Running" BranchStateOfStop = "Stop" - BranchStateOfCompleted = "completed" - BranchStateOfError = "error" + BranchStateOfCompleted = "Completed" + BranchStateOfError = "Error" ) -const UpdateInterval = 5 * time.Second +const UpdateInterval = 2 * time.Second type BranchWatcher struct { dbConfig dbconfigs.Connector @@ -103,7 +103,7 @@ func (b *BranchWatcher) watch() error { return err } } else { - err = b.updateBranchState(ctx, conn, BranchStateOfError, message, workflow) + err = b.updateBranchState(ctx, conn, BranchStateOfStop, message, workflow) if err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index 1ae146b81a..a889b8a880 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -635,13 +635,13 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er } case *sqlparser.FuncExpr: if !expr.Name.EqualString("in_keyrange") { - return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) + return fmt.Errorf("unsupported constraint1: %v", sqlparser.String(expr)) } if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil { return err } default: - return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) + return fmt.Errorf("unsupported constraint2: %v", sqlparser.String(expr)) } } return nil From 2041c2c6972173ec3189347c2a44a73fdf1925ee Mon Sep 17 00:00:00 2001 From: geray <919179287@qq.com> Date: Thu, 4 Jan 2024 10:21:32 +0800 Subject: [PATCH 3/3] test: fix tablePlan test Signed-off-by: <919179287@qq.com> --- go/vt/vttablet/tabletserver/vstreamer/planbuilder.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go index a889b8a880..1ae146b81a 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go +++ b/go/vt/vttablet/tabletserver/vstreamer/planbuilder.go @@ -635,13 +635,13 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er } case *sqlparser.FuncExpr: if !expr.Name.EqualString("in_keyrange") { - return fmt.Errorf("unsupported constraint1: %v", sqlparser.String(expr)) + return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) } if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil { return err } default: - return fmt.Errorf("unsupported constraint2: %v", sqlparser.String(expr)) + return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr)) } } return nil