Skip to content

Commit

Permalink
fix: comment
Browse files Browse the repository at this point in the history
  • Loading branch information
newborn22 committed Dec 3, 2024
1 parent 53192ae commit f6cda92
Showing 1 changed file with 70 additions and 46 deletions.
116 changes: 70 additions & 46 deletions go/vt/vtgate/branch/branch_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,12 +249,24 @@ func (bs *BranchService) BranchDiff(name string, includeDatabases, excludeDataba
}
}

// 根据option,计算schema
// 不对输入的meta做参数校验,默认输入为正确情况
// 幂等性:每次执行时都删除旧的ddl条目,重新计算并插入。
// 只有状态为preparing,prepared,merged,created时才能执行
// override: 试图将target分支覆盖掉source分支,
// merge:试图将target相当于snapshot的修改合并到source分支,会通过三路合并算法进行合并的冲突检测,若冲突则返回错误
// BranchPrepareMergeBack prepares target branch for merging changes back into source branch based on the specified merge option.
//
// - The function ensures idempotency by deleting existing DDL entries before recalculating and inserting new ones.
// - This function can only be executed if the branch status is "preparing", "prepared", "merged", or "created".
//
// Parameters:
// - name: The name of the branch.
// - status: The current status of the branch. It must be one of "preparing", "prepared", "merged", or "created".
// - includeDatabases: A list of databases to include in the operation.
// - excludeDatabases: A list of databases to exclude from the operation.
// - mergeOption: Determines the merge strategy. Possible values:
// - MergeOverride: Attempts to override the source branch with the target branch.
// - MergeDiff: Attempts to merge the modifications of the target branch relative to a snapshot into the source branch. Uses a three-way merge algorithm to detect conflicts. Returns an error if conflicts are detected.
//
// - hints: Optional hints to guide the schema difference calculation process.
// Returns:
// - BranchDiff: The calculated DDL operations required for the merge-back.
// - error: An error if any step of the process fails.
func (bs *BranchService) BranchPrepareMergeBack(name string, status BranchStatus, includeDatabases, excludeDatabases []string, mergeOption MergeBackOption, hints *schemadiff.DiffHints) (*BranchDiff, error) {
if mergeOption != MergeOverride && mergeOption != MergeDiff {
return nil, fmt.Errorf("%v is invalid merge option, should be one of %v or %v", mergeOption, MergeOverride, MergeDiff)
Expand Down Expand Up @@ -305,31 +317,44 @@ func (bs *BranchService) BranchPrepareMergeBack(name string, status BranchStatus
return ddls, nil
}

// todo make it Idempotence
// todo enhancement: track whether the current ddl to apply has finished or is executing
// 幂等性:确保执行prepare merge中记录的ddl,crash后再次执行时,从上次没有执行的DDL继续。
// 难点:crash时,发送的那条DDL到底执行与否。有办法解决。但先记为todo,因为不同mysql协议数据库的解决方案不同。
// merge完成后,更新snapshot。
// BranchMergeBack executes the prepared DDLs for merging target branch back into source branch in an idempotent manner.
// If the operation crashes or is interrupted, it ensures that subsequent executions will continue from the last uncompleted DDL.
//
// Parameters:
// - name: The name of the target branch for the merge-back operation.
// - status: The current status of the branch. It must be one of the following:
// - StatusPrepared: Indicates that the branch has been prepared for merging.
// - StatusMerging: Indicates that the merge operation is already in progress.
//
// Returns:
// - error: An error if any step of the process fails.
//
// Notes:
// - The function is designed to ensure idempotency, meaning it can handle interruptions and crashes gracefully.
// - Enhancements planned for the future include:
// - Tracking whether the current DDL being applied has finished or is still executing.
// - Updating the snapshot to allow merging more than once.
// - Supporting multi-version snapshots.
func (bs *BranchService) BranchMergeBack(name string, status BranchStatus) error {

// 状态检查,只有prepared或者Merging才能执行
// check status
if !statusIsOneOf(status, []BranchStatus{StatusPrepared, StatusMerging}) {
return fmt.Errorf("%v is invalid Status, should be one of %v or %v", status, StatusPrepared, StatusMerging)
}

// 将status改为Merging
// set status Merging
err := bs.targetMySQLService.UpdateBranchStatus(name, StatusMerging)
if err != nil {
return err
}

// 逐一获取,执行,记录ddl执行
// select and execute merge back ddl one by one
// todo enhancement: track whether the current ddl to apply has finished or is executing
err = bs.executeMergeBackDDLOneByOne(name)
if err != nil {
return err
}

// 执行完毕后更新状态
// set status Merged
return bs.targetMySQLService.UpdateBranchStatus(name, StatusMerged)

// todo enhancement: update snapshot to support merging more than once
Expand Down Expand Up @@ -430,9 +455,8 @@ func (bs *BranchService) getMergeBackMergeDiffDDLs(name string, includeDatabases
// 5. If both are valid but diff1(diff2(snapshot)) is not equal to diff2(diff1(snapshot)), there's a conflict.
// 6. If both transformations are valid, and diff1(diff2(snapshot)) equals diff2(diff1(snapshot)), it signifies there's no conflict.
//
// 函数的输入是branchSchema,有多个数据库的schema
// Notes: The parameters are BranchSchema Type, which concludes the multi database schema, it's an instance-level schema.
// todo add UT
// todo 注释
func branchSchemasConflictCheck(branchSchema1, branchSchema2, snapshot *BranchSchema, hints *schemadiff.DiffHints) (bool, string, error) {
branchDiffFromSnapshotToSchema1, err := getBranchSchemaDiff(snapshot, branchSchema1, hints)
if err != nil {
Expand All @@ -442,27 +466,27 @@ func branchSchemasConflictCheck(branchSchema1, branchSchema2, snapshot *BranchSc
if err != nil {
return false, "", err
}
DatabaseSchemas1, err := branchSchemaToDatabaseSchemas(branchSchema1)
instanceSchema1, err := branchSchemaToInstanceSchema(branchSchema1)
if err != nil {
return false, "", err
}
DatabaseSchemas2, err := branchSchemaToDatabaseSchemas(branchSchema2)
instanceSchema2, err := branchSchemaToInstanceSchema(branchSchema2)
if err != nil {
return false, "", err
}

// diff1 应用到 bschema2上:
NewDatabaseSchemas2, err := applyBranchDiffToDatabaseSchemas(DatabaseSchemas2, branchDiffFromSnapshotToSchema1)
// apply diff1 to schema2
NewInstanceSchema2, err := applyBranchDiffToInstanceSchema(instanceSchema2, branchDiffFromSnapshotToSchema1)
if err != nil {
return false, "", err
}
// 同理将diff2应用到bschema1
NewDatabaseSchemas1, err := applyBranchDiffToDatabaseSchemas(DatabaseSchemas1, branchDiffFromSnapshotToSchema2)
// apply diff2 to schema1
NewInstanceSchema1, err := applyBranchDiffToInstanceSchema(instanceSchema1, branchDiffFromSnapshotToSchema2)
if err != nil {
return false, "", err
}
// 比较两个branch schema中的每一个schema是否都相同
equal, message, err := databaseSchemasEqual(NewDatabaseSchemas1, NewDatabaseSchemas2, hints)
// compare two instance-level schemas
equal, message, err := instanceSchemaEqual(NewInstanceSchema1, NewInstanceSchema2, hints)
if err != nil {
return false, "", err
}
Expand All @@ -473,7 +497,7 @@ func branchSchemasConflictCheck(branchSchema1, branchSchema2, snapshot *BranchSc
return false, "", nil
}

func branchSchemaToDatabaseSchemas(branchSchema *BranchSchema) (map[string]*schemadiff.Schema, error) {
func branchSchemaToInstanceSchema(branchSchema *BranchSchema) (map[string]*schemadiff.Schema, error) {
databaseSchemas := make(map[string]*schemadiff.Schema)
for dbName, databaseSchemaInMap := range branchSchema.branchSchema {
tableSchemaQueries := make([]string, 0)
Expand All @@ -490,29 +514,29 @@ func branchSchemaToDatabaseSchemas(branchSchema *BranchSchema) (map[string]*sche
return databaseSchemas, nil
}

// todo comment
// todo add UT
func applyBranchDiffToDatabaseSchemas(databaseSchemas map[string]*schemadiff.Schema, branchDiff *BranchDiff) (map[string]*schemadiff.Schema, error) {
func applyBranchDiffToInstanceSchema(instanceSchema map[string]*schemadiff.Schema, branchDiff *BranchDiff) (map[string]*schemadiff.Schema, error) {
resultDatabaseSchemas := make(map[string]*schemadiff.Schema)
for database, databaseDiff := range branchDiff.Diffs {
// 处理删除数据库的情况
// need drop database
if databaseDiff.NeedDropDatabase {
if _, exists := databaseSchemas[database]; !exists {
return nil, fmt.Errorf("database %s not exist in databaseSchemas, can not drop database", database)
if _, exists := instanceSchema[database]; !exists {
return nil, fmt.Errorf("database %s not exist in instanceSchema, can not drop database", database)
}
// this database will not occur in the result, skip
continue
}

// 处理创建一个新数据库的情况
// need create new database
if databaseDiff.NeedCreateDatabase {
if _, exists := databaseSchemas[database]; exists {
return nil, fmt.Errorf("database %s already exist in databaseSchemas, can not create database", database)
if _, exists := instanceSchema[database]; exists {
return nil, fmt.Errorf("database %s already exist in instanceSchema, can not create database", database)
}

createTableQueries := make([]string, 0)
for _, createTableQuery := range databaseDiff.TableDDLs {
// 虽然这里是数组,但是由于每张表都不存在,因此实际上数组只有一个元素,且都是create table语句
// Although this appears to be an array, since each table doesn't exist,
// the array actually only contains one element, which is a CREATE TABLE statement.
createTableQueries = append(createTableQueries, createTableQuery...)
}
databaseSchema, err := schemadiff.NewSchemaFromQueries(createTableQueries)
Expand All @@ -523,13 +547,13 @@ func applyBranchDiffToDatabaseSchemas(databaseSchemas map[string]*schemadiff.Sch
continue
}

// 处理已有数据库的情况
if _, exists := databaseSchemas[database]; !exists {
return nil, fmt.Errorf("database %s not exist in databaseSchemas, can not modify database", database)
// deal with existing database
if _, exists := instanceSchema[database]; !exists {
return nil, fmt.Errorf("database %s not exist in instanceSchema, can not modify database", database)
}

// 将该数据库的diff应用到
databaseSchema := databaseSchemas[database]
// apply diff to database
databaseSchema := instanceSchema[database]
entityDiffs := make([]schemadiff.EntityDiff, 0)
for _, entityDiff := range databaseDiff.tableEntityDiffs {
entityDiffs = append(entityDiffs, entityDiff)
Expand All @@ -544,14 +568,14 @@ func applyBranchDiffToDatabaseSchemas(databaseSchemas map[string]*schemadiff.Sch
}

// todo UT
func databaseSchemasEqual(databaseSchemas1, databaseSchemas2 map[string]*schemadiff.Schema, hints *schemadiff.DiffHints) (bool, string, error) {
if len(databaseSchemas1) != len(databaseSchemas2) {
func instanceSchemaEqual(instanceSchema1, instanceSchema2 map[string]*schemadiff.Schema, hints *schemadiff.DiffHints) (bool, string, error) {
if len(instanceSchema1) != len(instanceSchema2) {
return false, fmt.Sprintf("number of databases not equal: %d != %d",
len(databaseSchemas1), len(databaseSchemas2)), nil
len(instanceSchema1), len(instanceSchema2)), nil
}

for dbName, schema1 := range databaseSchemas1 {
schema2, exists := databaseSchemas2[dbName]
for dbName, schema1 := range instanceSchema1 {
schema2, exists := instanceSchema2[dbName]
if !exists {
return false, fmt.Sprintf("database %s exists in source but not in target", dbName), nil
}
Expand Down

0 comments on commit f6cda92

Please sign in to comment.