Skip to content

Commit

Permalink
Move VDiff related workflow server APIs to vdiff.go and add unit te…
Browse files Browse the repository at this point in the history
…sts (#17466)

Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 authored Jan 10, 2025
1 parent 9b0bbad commit 54dfd60
Show file tree
Hide file tree
Showing 4 changed files with 646 additions and 537 deletions.
284 changes: 0 additions & 284 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@ import (
"context"
"errors"
"fmt"
"math"
"slices"
"sort"
"strings"
"sync"
"text/template"
"time"

"github.com/google/uuid"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc/codes"
Expand All @@ -41,7 +39,6 @@ import (
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
Expand Down Expand Up @@ -1249,287 +1246,6 @@ func (s *Server) ReshardCreate(ctx context.Context, req *vtctldatapb.ReshardCrea
})
}

// VDiffCreate is part of the vtctlservicepb.VtctldServer interface.
// It passes on the request to the target primary tablets that are
// participating in the given workflow and VDiff.
func (s *Server) VDiffCreate(ctx context.Context, req *vtctldatapb.VDiffCreateRequest) (*vtctldatapb.VDiffCreateResponse, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffCreate")
defer span.Finish()

span.Annotate("keyspace", req.TargetKeyspace)
span.Annotate("workflow", req.Workflow)
span.Annotate("uuid", req.Uuid)
span.Annotate("source_cells", req.SourceCells)
span.Annotate("target_cells", req.TargetCells)
span.Annotate("tablet_types", req.TabletTypes)
span.Annotate("tables", req.Tables)
span.Annotate("auto_retry", req.AutoRetry)
span.Annotate("max_diff_duration", req.MaxDiffDuration)
if req.AutoStart != nil {
span.Annotate("auto_start", req.GetAutoStart())
}

var err error
req.Uuid = strings.TrimSpace(req.Uuid)
if req.Uuid == "" { // Generate a UUID
req.Uuid = uuid.New().String()
} else { // Validate UUID if provided
if err = uuid.Validate(req.Uuid); err != nil {
return nil, vterrors.Wrapf(err, "invalid UUID provided: %s", req.Uuid)
}
}

tabletTypesStr := discovery.BuildTabletTypesString(req.TabletTypes, req.TabletSelectionPreference)

if req.Limit == 0 { // This would produce no useful results
req.Limit = math.MaxInt64
}
// This is a pointer so there's no ZeroValue in the message
// and an older v18 client will not provide it.
if req.MaxDiffDuration == nil {
req.MaxDiffDuration = &vttimepb.Duration{}
}
// The other vttime.Duration vars should not be nil as the
// client should always provide them, but we check anyway to
// be safe.
if req.FilteredReplicationWaitTime == nil {
// A value of 0 is not valid as the vdiff will never succeed.
req.FilteredReplicationWaitTime = &vttimepb.Duration{
Seconds: int64(DefaultTimeout.Seconds()),
}
}
if req.WaitUpdateInterval == nil {
req.WaitUpdateInterval = &vttimepb.Duration{}
}

autoStart := true
if req.AutoStart != nil {
autoStart = req.GetAutoStart()
}

options := &tabletmanagerdatapb.VDiffOptions{
PickerOptions: &tabletmanagerdatapb.VDiffPickerOptions{
TabletTypes: tabletTypesStr,
SourceCell: strings.Join(req.SourceCells, ","),
TargetCell: strings.Join(req.TargetCells, ","),
},
CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{
Tables: strings.Join(req.Tables, ","),
AutoRetry: req.AutoRetry,
MaxRows: req.Limit,
TimeoutSeconds: req.FilteredReplicationWaitTime.Seconds,
MaxExtraRowsToCompare: req.MaxExtraRowsToCompare,
UpdateTableStats: req.UpdateTableStats,
MaxDiffSeconds: req.MaxDiffDuration.Seconds,
AutoStart: &autoStart,
},
ReportOptions: &tabletmanagerdatapb.VDiffReportOptions{
OnlyPks: req.OnlyPKs,
DebugQuery: req.DebugQuery,
MaxSampleRows: req.MaxReportSampleRows,
RowDiffColumnTruncateAt: req.RowDiffColumnTruncateAt,
},
}

tabletreq := &tabletmanagerdatapb.VDiffRequest{
Keyspace: req.TargetKeyspace,
Workflow: req.Workflow,
Action: string(vdiff.CreateAction),
Options: options,
VdiffUuid: req.Uuid,
}

ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow)
if err != nil {
return nil, err
}
if ts.frozen {
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "invalid VDiff run: writes have been already been switched for workflow %s.%s",
req.TargetKeyspace, req.Workflow)
}

workflowStatus, err := s.getWorkflowStatus(ctx, req.TargetKeyspace, req.Workflow)
if err != nil {
return nil, err
}
if workflowStatus != binlogdatapb.VReplicationWorkflowState_Running {
s.Logger().Infof("Workflow %s.%s is not running, cannot start VDiff in state %s", req.TargetKeyspace, req.Workflow, workflowStatus)
return nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"not all streams are running in workflow %s.%s", req.TargetKeyspace, req.Workflow)
}

err = ts.ForAllTargets(func(target *MigrationTarget) error {
_, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq)
return err
})
if err != nil {
s.Logger().Errorf("Error executing vdiff create action: %v", err)
return nil, err
}

return &vtctldatapb.VDiffCreateResponse{
UUID: req.Uuid,
}, nil
}

// VDiffDelete is part of the vtctlservicepb.VtctldServer interface.
func (s *Server) VDiffDelete(ctx context.Context, req *vtctldatapb.VDiffDeleteRequest) (*vtctldatapb.VDiffDeleteResponse, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffDelete")
defer span.Finish()

span.Annotate("keyspace", req.TargetKeyspace)
span.Annotate("workflow", req.Workflow)
span.Annotate("argument", req.Arg)

tabletreq := &tabletmanagerdatapb.VDiffRequest{
Keyspace: req.TargetKeyspace,
Workflow: req.Workflow,
Action: string(vdiff.DeleteAction),
ActionArg: req.Arg,
}

ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow)
if err != nil {
return nil, err
}

err = ts.ForAllTargets(func(target *MigrationTarget) error {
_, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq)
return err
})
if err != nil {
s.Logger().Errorf("Error executing vdiff delete action: %v", err)
return nil, err
}

return &vtctldatapb.VDiffDeleteResponse{}, nil
}

// VDiffResume is part of the vtctlservicepb.VtctldServer interface.
func (s *Server) VDiffResume(ctx context.Context, req *vtctldatapb.VDiffResumeRequest) (*vtctldatapb.VDiffResumeResponse, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffResume")
defer span.Finish()

targetShards := req.GetTargetShards()

span.Annotate("keyspace", req.TargetKeyspace)
span.Annotate("workflow", req.Workflow)
span.Annotate("uuid", req.Uuid)
span.Annotate("target_shards", targetShards)

tabletreq := &tabletmanagerdatapb.VDiffRequest{
Keyspace: req.TargetKeyspace,
Workflow: req.Workflow,
Action: string(vdiff.ResumeAction),
VdiffUuid: req.Uuid,
}

ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow)
if err != nil {
return nil, err
}

if len(targetShards) > 0 {
if err := applyTargetShards(ts, targetShards); err != nil {
return nil, err
}
}

err = ts.ForAllTargets(func(target *MigrationTarget) error {
_, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq)
return err
})
if err != nil {
s.Logger().Errorf("Error executing vdiff resume action: %v", err)
return nil, err
}

return &vtctldatapb.VDiffResumeResponse{}, nil
}

// VDiffShow is part of the vtctlservicepb.VtctldServer interface.
func (s *Server) VDiffShow(ctx context.Context, req *vtctldatapb.VDiffShowRequest) (*vtctldatapb.VDiffShowResponse, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffShow")
defer span.Finish()

span.Annotate("keyspace", req.TargetKeyspace)
span.Annotate("workflow", req.Workflow)
span.Annotate("argument", req.Arg)

tabletreq := &tabletmanagerdatapb.VDiffRequest{
Keyspace: req.TargetKeyspace,
Workflow: req.Workflow,
Action: string(vdiff.ShowAction),
ActionArg: req.Arg,
}

ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow)
if err != nil {
return nil, err
}

output := &vdiffOutput{
responses: make(map[string]*tabletmanagerdatapb.VDiffResponse, len(ts.targets)),
err: nil,
}
output.err = ts.ForAllTargets(func(target *MigrationTarget) error {
resp, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq)
output.mu.Lock()
defer output.mu.Unlock()
output.responses[target.GetShard().ShardName()] = resp
return err
})
if output.err != nil {
s.Logger().Errorf("Error executing vdiff show action: %v", output.err)
return nil, output.err
}
return &vtctldatapb.VDiffShowResponse{
TabletResponses: output.responses,
}, nil
}

// VDiffStop is part of the vtctlservicepb.VtctldServer interface.
func (s *Server) VDiffStop(ctx context.Context, req *vtctldatapb.VDiffStopRequest) (*vtctldatapb.VDiffStopResponse, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.VDiffStop")
defer span.Finish()

targetShards := req.GetTargetShards()

span.Annotate("keyspace", req.TargetKeyspace)
span.Annotate("workflow", req.Workflow)
span.Annotate("uuid", req.Uuid)
span.Annotate("target_shards", targetShards)

tabletreq := &tabletmanagerdatapb.VDiffRequest{
Keyspace: req.TargetKeyspace,
Workflow: req.Workflow,
Action: string(vdiff.StopAction),
VdiffUuid: req.Uuid,
}

ts, err := s.buildTrafficSwitcher(ctx, req.TargetKeyspace, req.Workflow)
if err != nil {
return nil, err
}

if len(targetShards) > 0 {
if err := applyTargetShards(ts, targetShards); err != nil {
return nil, err
}
}

err = ts.ForAllTargets(func(target *MigrationTarget) error {
_, err := s.tmc.VDiff(ctx, target.GetPrimary().Tablet, tabletreq)
return err
})
if err != nil {
s.Logger().Errorf("Error executing vdiff stop action: %v", err)
return nil, err
}

return &vtctldatapb.VDiffStopResponse{}, nil
}

// WorkflowDelete is part of the vtctlservicepb.VtctldServer interface.
// It passes on the request to the target primary tablets that are
// participating in the given workflow.
Expand Down
Loading

0 comments on commit 54dfd60

Please sign in to comment.