Skip to content

Commit

Permalink
refac: Remove duplicate function
Browse files Browse the repository at this point in the history
Signed-off-by: Noble Mittal <[email protected]>
  • Loading branch information
beingnoble03 committed Nov 11, 2024
1 parent 88adf99 commit fc4f0c7
Showing 1 changed file with 4 additions and 49 deletions.
53 changes: 4 additions & 49 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -558,51 +558,6 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
return ts, state, nil
}

func (s *Server) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) ([]*vtctldatapb.Workflow_Stream_CopyState, error) {
span, ctx := trace.NewSpan(ctx, "workflow.Server.getWorkflowCopyStates")
defer span.Finish()

span.Annotate("keyspace", tablet.Keyspace)
span.Annotate("shard", tablet.Shard)
span.Annotate("tablet_alias", tablet.AliasString())
span.Annotate("stream_ids", fmt.Sprintf("%#v", streamIds))

idsBV, err := sqltypes.BuildBindVariable(streamIds)
if err != nil {
return nil, err
}
query, err := sqlparser.ParseAndBind("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in %a and id in (select max(id) from _vt.copy_state where vrepl_id in %a group by vrepl_id, table_name)",
idsBV, idsBV)
if err != nil {
return nil, err
}
qr, err := s.tmc.VReplicationExec(ctx, tablet.Tablet, query)
if err != nil {
return nil, err
}

result := sqltypes.Proto3ToResult(qr)
if result == nil {
return nil, nil
}

copyStates := make([]*vtctldatapb.Workflow_Stream_CopyState, len(result.Rows))
for i, row := range result.Rows {
streamId, err := row[0].ToInt64()
if err != nil {
return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to cast vrepl_id to int64: %v", err)
}
// These string fields are technically varbinary, but this is close enough.
copyStates[i] = &vtctldatapb.Workflow_Stream_CopyState{
StreamId: streamId,
Table: row[1].ToString(),
LastPk: row[2].ToString(),
}
}

return copyStates, nil
}

// LookupVindexCreate creates the lookup vindex in the specified
// keyspace and creates a VReplication workflow to backfill that
// vindex from the keyspace to the target/lookup table specified.
Expand Down Expand Up @@ -1023,7 +978,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
}
}
if isStandardMoveTables() { // Non-standard ones do not use shard scoped mechanisms
if err := s.setupInitialDeniedTables(ctx, ts); err != nil {
if err := setupInitialDeniedTables(ctx, ts); err != nil {
return nil, vterrors.Wrapf(err, "failed to put initial denied tables entries in place on the target shards")
}
}
Expand Down Expand Up @@ -1078,7 +1033,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
})
}

func (s *Server) validateRoutingRuleFlags(req *vtctldatapb.MoveTablesCreateRequest, mz *materializer) error {
func validateRoutingRuleFlags(req *vtctldatapb.MoveTablesCreateRequest, mz *materializer) error {
if mz.IsMultiTenantMigration() {
switch {
case req.NoRoutingRules:
Expand All @@ -1090,7 +1045,7 @@ func (s *Server) validateRoutingRuleFlags(req *vtctldatapb.MoveTablesCreateReque
return nil
}

func (s *Server) setupInitialDeniedTables(ctx context.Context, ts *trafficSwitcher) error {
func setupInitialDeniedTables(ctx context.Context, ts *trafficSwitcher) error {
if ts.MigrationType() != binlogdatapb.MigrationType_TABLES {
return nil
}
Expand All @@ -1108,7 +1063,7 @@ func (s *Server) setupInitialDeniedTables(ctx context.Context, ts *trafficSwitch
}

func (s *Server) setupInitialRoutingRules(ctx context.Context, req *vtctldatapb.MoveTablesCreateRequest, mz *materializer, tables []string) error {
if err := s.validateRoutingRuleFlags(req, mz); err != nil {
if err := validateRoutingRuleFlags(req, mz); err != nil {
return err
}

Expand Down

0 comments on commit fc4f0c7

Please sign in to comment.