Skip to content

Commit

Permalink
test: add branch monitor testcase
Browse files Browse the repository at this point in the history
Signed-off-by: <[email protected]>
  • Loading branch information
gerayking committed Dec 22, 2023
1 parent 4336482 commit d66df47
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 12 deletions.
125 changes: 122 additions & 3 deletions go/test/endtoend/branch/branch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@ import (
"vitess.io/vitess/go/mysql"
)

type BranchStatus string

const (
BranchStateOfPrepare = "Prepare"
BranchStateOfRunning = "Running"
BranchStateOfStop = "Stop"
BranchStateOfCompleted = "Completed"
BranchStateOfError = "Error"
)

func insertUsers(dbConn *mysql.Conn, end int) error {
start := 1
batchSize := 1000
Expand Down Expand Up @@ -265,7 +275,7 @@ func checkStateOfVreplication(t *testing.T, uuid, expectState string) {
}

func WaitForVreplicationState(t *testing.T, vtParams *mysql.ConnParams, workflow string, timeout time.Duration, expectStates ...string) string {
query, err := sqlparser.ParseAndBind("select state from mysql.vreplication where workflow=%a",
query, err := sqlparser.ParseAndBind("select * from mysql.vreplication where workflow=%a",
sqltypes.StringBindVariable(workflow),
)
require.NoError(t, err)
Expand All @@ -280,7 +290,7 @@ func WaitForVreplicationState(t *testing.T, vtParams *mysql.ConnParams, workflow
r := onlineddl.VtgateExecQuery(t, vtParams, query, "")
for _, row := range r.Named().Rows {
lastKnownVreplicationState = row["state"].ToString()

t.Logf("message : %v", row["message"].ToString())
if statesMap[lastKnownVreplicationState] {
return lastKnownVreplicationState
}
Expand Down Expand Up @@ -316,7 +326,30 @@ func CheckMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []clu
assert.Equal(t, len(shards), count)
}

// WaitForMigrationStatus waits for a migration to reach either provided statuses (returns immediately), or eventually time out
func CheckBranchStatus(t *testing.T, vtParams *mysql.ConnParams, workflow string, expectStatuses ...BranchStatus) {
query, err := sqlparser.ParseAndBind("select * from mysql.branch_jobs where workflow_name=%a",
sqltypes.StringBindVariable(workflow),
)
require.NoError(t, err)

r := VtgateExecQuery(t, vtParams, query, "")
fmt.Printf("# output for `%s`:\n", query)

count := 0
for _, row := range r.Named().Rows {
if row["workflow_name"].ToString() != workflow {
continue
}
for _, expectStatus := range expectStatuses {
if row["status"].ToString() == string(expectStatus) {
count++
break
}
}
}
assert.Equal(t, 1, count)
}

func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []cluster.Shard, uuid string, timeout time.Duration, expectStatuses ...schema.OnlineDDLStatus) schema.OnlineDDLStatus {
shardNames := map[string]bool{}
for _, shard := range shards {
Expand Down Expand Up @@ -359,6 +392,45 @@ func WaitForMigrationStatus(t *testing.T, vtParams *mysql.ConnParams, shards []c
return schema.OnlineDDLStatus(lastKnownStatus)
}

// WaitForBranchStatus waits for a branch to reach either provided statuses (returns immediately), or eventually time out
func WaitForBranchStatus(t *testing.T, vtParams *mysql.ConnParams, workflow string, timeout time.Duration, expectStatuses ...BranchStatus) BranchStatus {
query, err := sqlparser.ParseAndBind("select * from mysql.branch_jobs where workflow_name=%a",
sqltypes.StringBindVariable(workflow),
)
require.NoError(t, err)

statusesMap := map[string]bool{}
for _, status := range expectStatuses {
statusesMap[string(status)] = true
}
startTime := time.Now()
lastKnownStatus := ""
for time.Since(startTime) < timeout {
countMatchedShards := 0
r := VtgateExecQuery(t, vtParams, query, "")
for _, row := range r.Named().Rows {
workflowName := row["workflow_name"].ToString()
if workflow != workflowName {
// irrelevant shard
continue
}
lastKnownStatus = row["status"].ToString()
message := row["message"].ToString()
if lastKnownStatus == BranchStateOfError {
t.Logf("schemaMigration fail, message : %v", message)
}
if row["migration_uuid"].ToString() == workflow && statusesMap[lastKnownStatus] {
countMatchedShards++
}
}
if countMatchedShards == 1 {
return BranchStatus(lastKnownStatus)
}
time.Sleep(1 * time.Second)
}
return BranchStatus(lastKnownStatus)
}

func TestBranchGoFakeitFunction(t *testing.T) {
workflowName := "branch_test"
t.Run("prepare branch", func(t *testing.T) {
Expand Down Expand Up @@ -409,6 +481,16 @@ func TestBranchGoFakeitFunction(t *testing.T) {
func TestBranchMergeBack(t *testing.T) {
workflowName := "TestBranchMergeBack"
defer func() {
ctx := context.Background()
branchSourceParams := mysql.ConnParams{
Host: clusterInstance.Hostname,
Port: clusterInstance.VtgateMySQLPort,
DbName: "branch_source",
}
conn, err := mysql.Connect(ctx, &branchSourceParams)
require.Nil(t, err)
_, err = conn.ExecuteFetch("DROP TABLE IF EXISTS news_table;\n", -1, false)
require.Nil(t, err)
CleanupDatabase(t)
clusterInstance.VtctlclientProcess.Cleanupbranch(workflowName)
}()
Expand Down Expand Up @@ -518,3 +600,40 @@ func TestPrepareBranch(t *testing.T) {
require.True(t, strings.HasPrefix(output, "successfully"))
defer CleanupDatabase(t)
}

func TestBranchWatcher(t *testing.T) {
workflowName := "TestBranchWatcher"
defer func() {
CleanupDatabase(t)
clusterInstance.VtctlclientProcess.Cleanupbranch(workflowName)
}()
t.Run("prepare branch", func(t *testing.T) {
output, err := clusterInstance.VtctlclientProcess.PrepareBranch(workflowName, "branch_source", "branch_target", "", "", "", "", false, "RAND()<0.1", false)
require.Nil(t, err)
require.True(t, strings.HasPrefix(output, "successfully"))
time.Sleep(2 * time.Second)
CheckBranchStatus(t, &vtParams, workflowName, BranchStateOfPrepare)
})
t.Run("update filterling rules", func(t *testing.T) {
VtgateExecQuery(t, &vtParams, `update mysql.branch_table_rules set filtering_rule='select id, gofakeit_generate(\'{firstname}:###:???:{moviename}\') as name from user WHERE id<=100' where source_table_name = 'user';`, "")
VtgateExecQuery(t, &vtParams, `update mysql.branch_table_rules set filtering_rule='select customer_id, gofakeit_bytype(\'regex\',\'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$\') as email from customer WHERE customer_id<=100' where source_table_name = 'customer';`, "")
VtgateExecQuery(t, &vtParams, `update mysql.branch_table_rules set filtering_rule='select sku,description,gofakeit_bytype(\'intrange\',110,150) as price,gofakeit_bytype(\'floatrange\',23.5,23.9) as weight from product' where source_table_name = 'product';`, "")
VtgateExecQuery(t, &vtParams, `update mysql.branch_table_rules set filtering_rule='SELECT order_id,gofakeit_bytype(\'bigint\') as customer_id,gofakeit_generate(\'{firstname}:###:???:{moviename}\') as sku,gofakeit_bytype(\'bigint\') as price FROM corder where customer_id<=100' where source_table_name = 'corder';`, "")
})
t.Run("start branch", func(t *testing.T) {
output, err := clusterInstance.VtctlclientProcess.StartBranch(workflowName)
require.Nil(t, err)
require.True(t, strings.HasSuffix(output, "successfully."))
RequireVRplicationExist(t, workflowName)
clusterInstance.VtctlclientProcess.StopBranch(workflowName)
time.Sleep(2 * time.Second)
CheckBranchStatus(t, &vtParams, workflowName, BranchStateOfStop)
// start again
output, err = clusterInstance.VtctlclientProcess.StartBranch(workflowName)
require.Nil(t, err)
require.True(t, strings.HasSuffix(output, "successfully."))
WaitForVreplicationState(t, &vtParams, workflowName, 5*time.Second, "Stopped")
time.Sleep(3 * time.Second)
CheckBranchStatus(t, &vtParams, workflowName, BranchStateOfCompleted)
})
}
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vtctlclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (vtctlclient *VtctlClientProcess) PrepareBranch(workflow, sourceDatabase, t
args = append(args, "--stop_after_copy")
}
if defaultFilterRules != "" {
args = append(args, "--default_filter_rules", fmt.Sprintf("\"%v\"", defaultFilterRules))
args = append(args, "--default_filter_rules", fmt.Sprintf("%v", defaultFilterRules))
}
if !skipCopyPhase {
args = append(args, "--skip_copy_phase=false")
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vttablet/tabletserver/branch_watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ const SeleteVReplicationByWorkflow = "SELECT * from mysql.vreplication where wor
const UpdateBranchJobStatusByWorkflow = "update mysql.branch_jobs set status=%a,message=%a where workflow_name=%a"

const (
BranchStateOfPrepare = "prepare"
BranchStateOfRunning = "running"
BranchStateOfPrepare = "Prepare"
BranchStateOfRunning = "Running"
BranchStateOfStop = "Stop"
BranchStateOfCompleted = "completed"
BranchStateOfError = "error"
BranchStateOfCompleted = "Completed"
BranchStateOfError = "Error"
)

const UpdateInterval = 5 * time.Second
const UpdateInterval = 2 * time.Second

type BranchWatcher struct {
dbConfig dbconfigs.Connector
Expand Down Expand Up @@ -103,7 +103,7 @@ func (b *BranchWatcher) watch() error {
return err
}
} else {
err = b.updateBranchState(ctx, conn, BranchStateOfError, message, workflow)
err = b.updateBranchState(ctx, conn, BranchStateOfStop, message, workflow)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/vstreamer/planbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,13 +635,13 @@ func (plan *Plan) analyzeWhere(vschema *localVSchema, where *sqlparser.Where) er
}
case *sqlparser.FuncExpr:
if !expr.Name.EqualString("in_keyrange") {
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
return fmt.Errorf("unsupported constraint1: %v", sqlparser.String(expr))
}
if err := plan.analyzeInKeyRange(vschema, expr.Exprs); err != nil {
return err
}
default:
return fmt.Errorf("unsupported constraint: %v", sqlparser.String(expr))
return fmt.Errorf("unsupported constraint2: %v", sqlparser.String(expr))
}
}
return nil
Expand Down

0 comments on commit d66df47

Please sign in to comment.