Skip to content

Commit

Permalink
fix missing workflow types for deprecated code
Browse files Browse the repository at this point in the history
Signed-off-by: Vilius Okockis <[email protected]>
  • Loading branch information
DeathBorn committed Apr 22, 2024
1 parent cddc9e6 commit a81f1d1
Showing 1 changed file with 5 additions and 3 deletions.
8 changes: 5 additions & 3 deletions go/vt/wrangler/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (wr *Wrangler) SplitClone(ctx context.Context, keyspace string, from, to []
Shard: source.ShardName(),
Filter: filter,
}
cmd := binlogplayer.CreateVReplicationState("VSplitClone", bls, "", binlogplayer.BlpStopped, master.DbName())
cmd := binlogplayer.CreateVReplicationState("VSplitClone", bls, "", binlogplayer.BlpStopped, master.DbName(), binlogdatapb.VReplicationWorkflowType_Reshard)
qr, err := wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd)
if err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd)
Expand Down Expand Up @@ -227,7 +227,7 @@ func (wr *Wrangler) VerticalSplitClone(ctx context.Context, fromKeyspace, toKeys
Shard: source.ShardName(),
Filter: filter,
}
cmd := binlogplayer.CreateVReplicationState("VSplitClone", bls, "", binlogplayer.BlpStopped, master.DbName())
cmd := binlogplayer.CreateVReplicationState("VSplitClone", bls, "", binlogplayer.BlpStopped, master.DbName(), binlogdatapb.VReplicationWorkflowType_MoveTables)
qr, err := wr.TabletManagerClient().VReplicationExec(ctx, master.Tablet, cmd)
if err != nil {
return vterrors.Wrapf(err, "VReplicationExec(%v, %s) failed", dest.MasterAlias, cmd)
Expand Down Expand Up @@ -867,6 +867,7 @@ func (wr *Wrangler) setupReverseReplication(ctx context.Context, sourceShards, d
if kr == nil {
kr = &topodatapb.KeyRange{}
}

// Create replications streams first using the retrieved master positions.
uids := make([]uint32, len(destinationShards))
for j, dest := range destinationShards {
Expand All @@ -875,13 +876,14 @@ func (wr *Wrangler) setupReverseReplication(ctx context.Context, sourceShards, d
Shard: dest.ShardName(),
KeyRange: kr,
}
qr, err := wr.VReplicationExec(ctx, sourceShard.MasterAlias, binlogplayer.CreateVReplicationState("ReversedResharding", bls, masterPositions[j], binlogplayer.BlpStopped, dbName))
qr, err := wr.VReplicationExec(ctx, sourceShard.MasterAlias, binlogplayer.CreateVReplicationState("ReversedResharding", bls, masterPositions[j], binlogplayer.BlpStopped, dbName, binlogdatapb.VReplicationWorkflowType_Reshard))
if err != nil {
return err
}
uids[j] = uint32(qr.InsertId)
wr.Logger().Infof("Created reverse replication for tablet %v/%v: %v, db: %v, pos: %v, uid: %v", sourceShard.Keyspace(), sourceShard.ShardName(), bls, dbName, masterPositions[j], uids[j])
}

// Source shards have to be atomically added to ensure idempotence.
// If this fails, there's no harm because the unstarted vreplication streams will just be abandoned.
sourceShards[i], err = wr.ts.UpdateShardFields(ctx, sourceShard.Keyspace(), sourceShard.ShardName(), func(si *topo.ShardInfo) error {
Expand Down

0 comments on commit a81f1d1

Please sign in to comment.