Skip to content

Commit

Permalink
fix: split large functions into smaller ones
Browse files Browse the repository at this point in the history
Signed-off-by: newborn22 <[email protected]>
  • Loading branch information
newborn22 committed Jan 10, 2024
1 parent 47b6a8d commit 83e7beb
Show file tree
Hide file tree
Showing 6 changed files with 439 additions and 355 deletions.
79 changes: 79 additions & 0 deletions go/vt/vttablet/jobcontroller/batch_sql_related.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@ Licensed under the Apache v2(found in the LICENSE file in the root directory).
package jobcontroller

import (
"context"
"errors"
"fmt"
"math"

"vitess.io/vitess/go/vt/vttablet/tabletserver/connpool"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/sqlparser"
Expand Down Expand Up @@ -211,3 +215,78 @@ func genNewBatchSQLsAndCountSQLsWhenSplittingBatch(batchSQLStmt, batchCountSQLSt
newBatchCountSQL = genSQLByReplaceWhereExprNode(batchCountSQLStmt, newBatchWhereExpr)
return curBatchSQL, newBatchSQL, newBatchCountSQL, nil
}

// replace selectExprs in batchCountSQLStmt with PK cols to generate selectPKsSQL
// the function will not change the original batchCountSQLStmt
func genSelectPKsSQL(batchCountSQLStmt sqlparser.Statement, pkInfos []PKInfo) string {
batchCountSQLStmtSelect, _ := batchCountSQLStmt.(*sqlparser.Select)
// 根据pk信息生成select exprs
var pkExprs []sqlparser.SelectExpr
for _, pkInfo := range pkInfos {
pkExprs = append(pkExprs, &sqlparser.AliasedExpr{Expr: sqlparser.NewColName(pkInfo.pkName)})
}
oldBatchCountSQLStmtSelectExprs := batchCountSQLStmtSelect.SelectExprs
batchCountSQLStmtSelect.SelectExprs = pkExprs
batchSplitSelectSQL := sqlparser.String(batchCountSQLStmtSelect)
// undo the change of select exprs
batchCountSQLStmtSelect.SelectExprs = oldBatchCountSQLStmtSelectExprs
return batchSplitSelectSQL
}

// get the begin and end fields of the batches newly created during splitting
func getNewBatchesBeginAndEndStr(ctx context.Context, conn *connpool.DBConn, batchTable, batchID string, curBatchNewEnd, newBatchStart []sqltypes.Value) (currentBatchNewBeginStr, currentBatchNewEndStr, newBatchBeginStr, newBatchEndStr string, err error) {
getBatchBeginAndEndSQL := fmt.Sprintf(sqlTemplateGetBatchBeginAndEnd, batchTable)
getBatchBeginAndEndQuery, err := sqlparser.ParseAndBind(getBatchBeginAndEndSQL, sqltypes.StringBindVariable(batchID))
if err != nil {
return "", "", "", "", err
}
qr, err := conn.Exec(ctx, getBatchBeginAndEndQuery, math.MaxInt32, true)
if err != nil {
return "", "", "", "", err
}
if len(qr.Named().Rows) != 1 {
return "", "", "", "", errors.New("can not get batch begin and end")
}
currentBatchNewBeginStr = qr.Named().Rows[0]["batch_begin"].ToString()
newBatchEndStr = qr.Named().Rows[0]["batch_end"].ToString()
currentBatchNewEndStr, newBatchBeginStr, err = genBatchStartAndEndStr(curBatchNewEnd, newBatchStart)
if err != nil {
return "", "", "", "", err
}
return currentBatchNewBeginStr, currentBatchNewEndStr, newBatchBeginStr, newBatchEndStr, nil
}
func updateBatchInfoTableEntry(ctx context.Context, conn *connpool.DBConn, batchTable string, curBatchSQL, currentBatchNewBeginStr, currentBatchNewEndStr, batchID string) (err error) {
sqlUpdateBatchInfoTableEntry := fmt.Sprintf(sqlTemplateUpdateBatchSQL, batchTable)
queryUpdateBatchInfoTableEntry, err := sqlparser.ParseAndBind(sqlUpdateBatchInfoTableEntry,
sqltypes.StringBindVariable(curBatchSQL),
sqltypes.StringBindVariable(currentBatchNewBeginStr),
sqltypes.StringBindVariable(currentBatchNewEndStr),
sqltypes.StringBindVariable(batchID))
if err != nil {
return err
}
_, err = conn.Exec(ctx, queryUpdateBatchInfoTableEntry, math.MaxInt32, false)
if err != nil {
return err
}
return nil
}

func insertBatchInfoTableEntry(ctx context.Context, conn *connpool.DBConn, batchTable, nextBatchID, newBatchSQL, newBatchCountSQL, newBatchBeginStr, newBatchEndStr string, newBatchSize int64) (err error) {
sqlInsertBatchInfoTableEntry := fmt.Sprintf(sqlTemplateInsertBatchEntry, batchTable)
queryInsertBatchInfoTableEntry, err := sqlparser.ParseAndBind(sqlInsertBatchInfoTableEntry,
sqltypes.StringBindVariable(nextBatchID),
sqltypes.StringBindVariable(newBatchSQL),
sqltypes.StringBindVariable(newBatchCountSQL),
sqltypes.Int64BindVariable(newBatchSize),
sqltypes.StringBindVariable(newBatchBeginStr),
sqltypes.StringBindVariable(newBatchEndStr))
if err != nil {
return err
}
_, err = conn.Exec(ctx, queryInsertBatchInfoTableEntry, math.MaxInt32, false)
if err != nil {
return err
}
return nil
}
11 changes: 11 additions & 0 deletions go/vt/vttablet/jobcontroller/batch_sql_related_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,14 @@ func TestStripComments(t *testing.T) {
})
}
}

func TestGenSelectPKsSQL(t *testing.T) {
batchCountSQL := "select count(*) from t where 1 = 1"
batchCountSQLStmt, _ := sqlparser.Parse(batchCountSQL)
pkInfos := []PKInfo{{pkName: "pk1"}, {pkName: "pk2"}}
gotSQL := genSelectPKsSQL(batchCountSQLStmt, pkInfos)
expectedSQL := "select pk1, pk2 from t where 1 = 1"
assert.Equalf(t, expectedSQL, gotSQL, "genSelectPKsSQL(%v,%v)", batchCountSQLStmt, pkInfos)
// batchCountSQLStmt should not be changed
assert.Equalf(t, batchCountSQL, sqlparser.String(batchCountSQLStmt), "genSelectPKsSQL(%v,%v)", batchCountSQLStmt, pkInfos)
}
Loading

0 comments on commit 83e7beb

Please sign in to comment.