Skip to content

Commit

Permalink
Changes from self review
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <[email protected]>
  • Loading branch information
mattlord committed Jan 3, 2025
1 parent e1832b2 commit e89ca17
Show file tree
Hide file tree
Showing 17 changed files with 64 additions and 47 deletions.
8 changes: 6 additions & 2 deletions go/cmd/vtctldclient/command/permissions.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,20 @@ var (
Args: cobra.ExactArgs(1),
RunE: commandGetPermissions,
}
// ValidatePermissionsShard makes a ValidatePermissionsKeyspace gRPC call to a
// vtctld with the specified shard to examine in the keyspace.
ValidatePermissionsShard = &cobra.Command{
Use: "ValidatePermissionsShard <keyspace/shard>",
Short: "Validates that the permissions on primary match all the replicas.",
Short: "Validates that the permissions on the primary match all of the replicas.",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
RunE: commandValidatePermissionsShard,
}
// ValidatePermissionsKeyspace makes a ValidatePermissionsKeyspace gRPC call to a
// vtctld.
ValidatePermissionsKeyspace = &cobra.Command{
Use: "ValidatePermissionsKeyspace <keyspace name>",
Short: "Validates that the permissions on primary of the first shard match those of all of the other tablets in the keyspace.",
Short: "Validates that the permissions on the primary of the first shard match those of all of the other tablets in the keyspace.",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
RunE: commandValidatePermissionsKeyspace,
Expand Down
12 changes: 6 additions & 6 deletions go/cmd/vtctldclient/command/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ For --sql, semi-colons and repeated values may be mixed, for example:
Args: cobra.ExactArgs(1),
RunE: commandValidateSchemaKeyspace,
}
// ValidateSchemaShard makes a ValidateSchemaKeyspace gRPC call to a vtctld WITH
// 1 specific shard to examine in the keyspace.
// ValidateSchemaShard makes a ValidateSchemaKeyspace gRPC call to a vtctld with
// the specified shard to examine in the keyspace.
ValidateSchemaShard = &cobra.Command{
Use: "ValidateSchemaShard [--exclude-tables=<exclude_tables>] [--include-views] [--skip-no-primary] [--include-vschema] <keyspace/shard>",
Short: "Validates that the schema on the primary tablet for the specified shard matches the schema on all other tablets in that shard.",
Expand Down Expand Up @@ -190,13 +190,13 @@ var copySchemaShardOptions = struct {
}{}

func commandCopySchemaShard(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

destKeyspace, destShard, err := topoproto.ParseKeyspaceShard(cmd.Flags().Arg(1))
if err != nil {
return err
}

cli.FinishedParsing(cmd)

var sourceTabletAlias *topodatapb.TabletAlias
sourceKeyspace, sourceShard, err := topoproto.ParseKeyspaceShard(cmd.Flags().Arg(0))
if err == nil {
Expand Down Expand Up @@ -377,9 +377,9 @@ var validateSchemaKeyspaceOptions = struct {
func commandValidateSchemaKeyspace(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

ks := cmd.Flags().Arg(0)
keyspace := cmd.Flags().Arg(0)
resp, err := client.ValidateSchemaKeyspace(commandCtx, &vtctldatapb.ValidateSchemaKeyspaceRequest{
Keyspace: ks,
Keyspace: keyspace,
ExcludeTables: validateSchemaKeyspaceOptions.ExcludeTables,
IncludeVschema: validateSchemaKeyspaceOptions.IncludeVSchema,
SkipNoPrimary: validateSchemaKeyspaceOptions.SkipNoPrimary,
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtctldclient/command/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func commandWriteTopologyPath(cmd *cobra.Command, args []string) error {
}
data, err := os.ReadFile(file)
if err != nil {
return fmt.Errorf("failed to read %s: %v", file, err)
return fmt.Errorf("failed to read file %s: %v", file, err)
}
_, err = conn.Update(cmd.Context(), path, data, nil)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion go/cmd/vtctldclient/plugin_grpctmclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@ limitations under the License.

package main

// Imports and register the gRPC tabletmanager client
// Imports and registers the gRPC tabletmanager client.
// This is needed when --server=internal as the vtctldclient
// binary will then not only need to talk to the topo server
// directly but it will also need to talk to tablets directly
// via tmclient.

import (
_ "vitess.io/vitess/go/vt/vttablet/grpctmclient"
Expand Down
4 changes: 2 additions & 2 deletions go/flags/endtoend/vtctldclient.txt
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ Available Commands:
VDiff Perform commands related to diffing tables involved in a VReplication workflow between the source and target.
Validate Validates that all nodes reachable from the global replication graph, as well as all tablets in discoverable cells, are consistent.
ValidateKeyspace Validates that all nodes reachable from the specified keyspace are consistent.
ValidatePermissionsKeyspace Validates that the permissions on primary of the first shard match those of all of the other tablets in the keyspace.
ValidatePermissionsShard Validates that the permissions on primary match all the replicas.
ValidatePermissionsKeyspace Validates that the permissions on the primary of the first shard match those of all of the other tablets in the keyspace.
ValidatePermissionsShard Validates that the permissions on the primary match all of the replicas.
ValidateSchemaKeyspace Validates that the schema on the primary tablet for the first shard matches the schema on all other tablets in the keyspace.
ValidateSchemaShard Validates that the schema on the primary tablet for the specified shard matches the schema on all other tablets in that shard.
ValidateShard Validates that all nodes reachable from the specified shard are consistent.
Expand Down
10 changes: 5 additions & 5 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1292,25 +1292,25 @@ func (cluster *LocalProcessCluster) NewVttabletInstance(tabletType string, UID i
func (cluster *LocalProcessCluster) NewVTOrcProcess(config VTOrcConfiguration) *VTOrcProcess {
base := VtProcessInstance("vtorc", "vtorc", cluster.TopoProcess.Port, cluster.Hostname)
return &VTOrcProcess{
VtProcess: *base,
VtProcess: base,
LogDir: cluster.TmpDirectory,
Config: config,
Port: cluster.GetAndReservePort(),
}
}

// VtctldClientProcessInstance returns a VtctldProcess handle for vtctldclient process
// configured with the given Config.
// VtctldClientProcessInstance returns a VtctldProcess handle for a
// vtctldclient process configured with the given Config.
func (cluster *LocalProcessCluster) NewVtctldClientProcessInstance(hostname string, grpcPort int, tmpDirectory string) *VtctldClientProcess {
version, err := GetMajorVersion("vtctldclient")
if err != nil {
log.Warningf("failed to get major vtctldclient version; interop with CLI changes for VEP-4 may not work: %s", err)
log.Warningf("failed to get major vtctldclient version; interop with CLI changes for VEP-4 may not work: %v", err)
}

base := VtProcessInstance("vtctldclient", "vtctldclient", cluster.TopoProcess.Port, cluster.Hostname)

vtctldclient := &VtctldClientProcess{
VtProcess: *base,
VtProcess: base,
Server: fmt.Sprintf("%s:%d", hostname, grpcPort),
TempDirectory: tmpDirectory,
VtctldClientMajorVersion: version,
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/cluster/vt_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type VtProcess struct {

// VtProcessInstance returns a VtProcess handle configured with the given Config.
// The process must be manually started by calling setup()
func VtProcessInstance(name, binary string, topoPort int, hostname string) *VtProcess {
func VtProcessInstance(name, binary string, topoPort int, hostname string) VtProcess {
// Default values for etcd2 topo server.
topoImplementation := "etcd2"
topoRootPath := "/"
Expand All @@ -49,7 +49,7 @@ func VtProcessInstance(name, binary string, topoPort int, hostname string) *VtPr
topoRootPath = ""
}

vt := &VtProcess{
vt := VtProcess{
Name: name,
Binary: binary,
TopoImplementation: topoImplementation,
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vtbackup_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func VtbackupProcessInstance(tabletUID int, mysqlPort int, newInitDBFile string,
cell string, hostname string, tmpDirectory string, topoPort int, initialBackup bool) *VtbackupProcess {
base := VtProcessInstance("vtbackup", "vtbackup", topoPort, hostname)
vtbackup := &VtbackupProcess{
VtProcess: *base,
VtProcess: base,
LogDir: tmpDirectory,
Directory: os.Getenv("VTDATAROOT"),
BackupStorageImplementation: "file",
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vtctld_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (vtctld *VtctldProcess) TearDown() error {
func VtctldProcessInstance(httpPort int, grpcPort int, topoPort int, hostname string, tmpDirectory string) *VtctldProcess {
base := VtProcessInstance("vtctld", "vtctld", topoPort, hostname)
vtctld := &VtctldProcess{
VtProcess: *base,
VtProcess: base,
ServiceMap: "grpc-vtctl,grpc-vtctld",
BackupStorageImplementation: "file",
FileBackupStorageRoot: path.Join(os.Getenv("VTDATAROOT"), "/backups"),
Expand Down
9 changes: 6 additions & 3 deletions go/test/endtoend/cluster/vtctldclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func VtctldClientProcessInstance(grpcPort int, topoPort int, hostname string, tm

base := VtProcessInstance("vtctldclient", "vtctldclient", topoPort, hostname)
vtctldclient := &VtctldClientProcess{
VtProcess: *base,
VtProcess: base,
Server: fmt.Sprintf("%s:%d", hostname, grpcPort),
TempDirectory: tmpDirectory,
VtctldClientMajorVersion: version,
Expand Down Expand Up @@ -116,6 +116,8 @@ func (vtctldclient *VtctldClientProcess) ExecuteCommandWithOutput(args ...string
}

// AddCellInfo executes the vtctldclient command to add cell info.
// It uses --server=internal as there may not yet be a vtctld running
// as we need to create a cell for vtctld to use first.
func (vtctldclient *VtctldClientProcess) AddCellInfo(Cell string) error {
args := []string{
"--server", "internal",
Expand Down Expand Up @@ -340,8 +342,9 @@ func (vtctldclient *VtctldClientProcess) OnlineDDLShow(keyspace, workflow string
)
}

// shouldRetry tells us if the command should be retried based on the results/output -- meaning that it
// is likely an ephemeral or recoverable issue that is likely to succeed when retried.
// shouldRetry tells us if the command should be retried based on the results/output
// -- meaning that it is likely an ephemeral or recoverable issue that is likely to
// succeed when retried.
func shouldRetry(cmdResults string) bool {
return strings.Contains(cmdResults, "Deadlock found when trying to get lock; try restarting transaction")
}
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vtgate_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ func VtgateProcessInstance(
) *VtgateProcess {
base := VtProcessInstance("vtgate", "vtgate", topoPort, hostname)
vtgate := &VtgateProcess{
VtProcess: *base,
VtProcess: base,
FileToLogQueries: path.Join(tmpDirectory, "/vtgate_querylog.txt"),
ConfigFile: path.Join(tmpDirectory, fmt.Sprintf("vtgate-config-%d.json", port)),
Directory: os.Getenv("VTDATAROOT"),
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vtorc_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (orc *VTOrcProcess) Setup() (err error) {
--config config/vtorc/default.json --alsologtostderr
*/
orc.proc = exec.Command(
"vtorc",
orc.Binary,
"--topo_implementation", orc.TopoImplementation,
"--topo_global_server_address", orc.TopoGlobalAddress,
"--topo_global_root", orc.TopoGlobalRoot,
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/vttablet_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ func (vttablet *VttabletProcess) IsShutdown() bool {
func VttabletProcessInstance(port, grpcPort, tabletUID int, cell, shard, keyspace string, vtctldPort int, tabletType string, topoPort int, hostname, tmpDirectory string, extraArgs []string, charset string) *VttabletProcess {
base := VtProcessInstance("vttablet", "vttablet", topoPort, hostname)
vttablet := &VttabletProcess{
VtProcess: *base,
VtProcess: base,
FileToLogQueries: path.Join(tmpDirectory, fmt.Sprintf("/vt_%010d_querylog.txt", tabletUID)),
Directory: path.Join(os.Getenv("VTDATAROOT"), fmt.Sprintf("/vt_%010d", tabletUID)),
Cell: cell,
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/reparent/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ func ErsIgnoreTablet(clusterInstance *cluster.LocalProcessCluster, tab *cluster.
return clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args...)
}

// ErsWithVtctldClient runs ERS via vtctldclient binary
// ErsWithVtctldClient runs ERS via a vtctldclient binary.
func ErsWithVtctldClient(clusterInstance *cluster.LocalProcessCluster) (string, error) {
args := []string{"EmergencyReparentShard", fmt.Sprintf("%s/%s", KeyspaceName, ShardName)}
return clusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(args...)
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (vc *VitessCluster) StartVTOrc() error {
}
base := cluster.VtProcessInstance("vtorc", "vtorc", vc.ClusterConfig.topoPort, vc.ClusterConfig.hostname)
vtorcProcess := &cluster.VTOrcProcess{
VtProcess: *base,
VtProcess: base,
LogDir: vc.ClusterConfig.tmpDir,
Config: cluster.VTOrcConfiguration{},
Port: vc.ClusterConfig.vtorcPort,
Expand Down
20 changes: 11 additions & 9 deletions go/test/endtoend/vreplication/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ func insertInitialDataIntoExternalCluster(t *testing.T, conn *mysql.Conn) {
})
}

// TestMigrateUnsharded runs an e2e test for importing from an external cluster using the vtctldclient Mount and Migrate commands.
// We have an anti-pattern in Vitess: vt executables look for an environment variable VTDATAROOT for certain cluster parameters
// like the log directory when they are created. Until this test we just needed a single cluster for e2e tests.
// However now we need to create an external Vitess cluster. For this we need a different VTDATAROOT and
// hence the VTDATAROOT env variable gets overwritten.
// Each time we need to create vt processes in the "other" cluster we need to set the appropriate VTDATAROOT
// TestMigrateUnsharded runs an e2e test for importing from an external cluster using the
// vtctldclient Mount and Migrate commands.We have an anti-pattern in Vitess: vt executables
// look for an environment variable VTDATAROOT for certain cluster parameters like the log
// directory when they are created. Until this test we just needed a single cluster for e2e
// tests. However now we need to create an external Vitess cluster. For this we need a
// different VTDATAROOT and hence the VTDATAROOT env variable gets overwritten. Each time
// we need to create vt processes in the "other" cluster we need to set the appropriate
// VTDATAROOT.
func TestMigrateUnsharded(t *testing.T) {
vc = NewVitessCluster(t, nil)
defer vc.TearDown()
Expand Down Expand Up @@ -188,9 +190,9 @@ func TestMigrateUnsharded(t *testing.T) {
})
}

// TestMigrateSharded adds a test for a sharded cluster to validate a fix for a bug where the target keyspace name
// doesn't match that of the source cluster. The test migrates from a cluster with keyspace customer to an "external"
// cluster with keyspace rating.
// TestMigrateSharded adds a test for a sharded cluster to validate a fix for a bug where
// the target keyspace name doesn't match that of the source cluster. The test migrates
// from a cluster with keyspace customer to an "external" cluster with keyspace rating.
func TestMigrateSharded(t *testing.T) {
setSidecarDBName("_vt")
currentWorkflowType = binlogdatapb.VReplicationWorkflowType_MoveTables
Expand Down
22 changes: 13 additions & 9 deletions go/vt/vtctl/grpcvtctldserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,9 +886,10 @@ func (s *VtctldServer) CopySchemaShard(ctx context.Context, req *vtctldatapb.Cop
return nil, err
}

return &vtctldatapb.CopySchemaShardResponse{}, s.ws.CopySchemaShard(ctx,
req.SourceTabletAlias, req.Tables, req.ExcludeTables, req.IncludeViews, req.DestinationKeyspace, req.DestinationShard,
waitReplicasTimeout, req.SkipVerify)
err = s.ws.CopySchemaShard(ctx, req.SourceTabletAlias, req.Tables, req.ExcludeTables, req.IncludeViews,
req.DestinationKeyspace, req.DestinationShard, waitReplicasTimeout, req.SkipVerify)

return &vtctldatapb.CopySchemaShardResponse{}, err
}

// CreateKeyspace is part of the vtctlservicepb.VtctldServer interface.
Expand Down Expand Up @@ -4736,7 +4737,7 @@ func (s *VtctldServer) ValidatePermissionsKeyspace(ctx context.Context, req *vtc
}

if len(shards) == 0 {
return nil, fmt.Errorf("no shards in keyspace %v", req.Keyspace)
return nil, fmt.Errorf("no shards found in keyspace %s", req.Keyspace)
}
sort.Strings(shards)

Expand All @@ -4758,7 +4759,7 @@ func (s *VtctldServer) ValidatePermissionsKeyspace(ctx context.Context, req *vtc
}
referencePermissions := pres.Permissions

// Then diff the first tablet with all others.
// Then diff the first/reference tablet with all the others.
eg, egctx := errgroup.WithContext(ctx)
for _, shard := range shards {
eg.Go(func() error {
Expand All @@ -4770,15 +4771,16 @@ func (s *VtctldServer) ValidatePermissionsKeyspace(ctx context.Context, req *vtc
if topoproto.TabletAliasEqual(alias, si.PrimaryAlias) {
continue
}
log.Infof("Gathering permissions for %v", topoproto.TabletAliasString(alias))
log.Infof("Gathering permissions for %s", topoproto.TabletAliasString(alias))
presp, err := s.GetPermissions(ctx, &vtctldatapb.GetPermissionsRequest{
TabletAlias: alias,
})
if err != nil {
return err
}

log.Infof("Diffing permissions for %s", topoproto.TabletAliasString(alias))
log.Infof("Diffing permissions between %s and %s", topoproto.TabletAliasString(referenceAlias),
topoproto.TabletAliasString(alias))
er := &concurrency.AllErrorRecorder{}
tmutils.DiffPermissions(topoproto.TabletAliasString(referenceAlias), referencePermissions,
topoproto.TabletAliasString(alias), presp.Permissions, er)
Expand All @@ -4792,11 +4794,13 @@ func (s *VtctldServer) ValidatePermissionsKeyspace(ctx context.Context, req *vtc
if err := eg.Wait(); err != nil {
return nil, fmt.Errorf("permissions diffs: %v", err)
}

return &vtctldatapb.ValidatePermissionsKeyspaceResponse{}, nil
}

// ValidateSchemaKeyspace is a part of the vtctlservicepb.VtctldServer interface.
// It will diff the schema from all the tablets in the keyspace.
// It will diff the schema between the tablets in all shards -- or a subset if
// any specific shards are specified -- within the keyspace.
func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldatapb.ValidateSchemaKeyspaceRequest) (resp *vtctldatapb.ValidateSchemaKeyspaceResponse, err error) {
span, ctx := trace.NewSpan(ctx, "VtctldServer.ValidateSchemaKeyspace")
defer span.Finish()
Expand All @@ -4819,7 +4823,7 @@ func (s *VtctldServer) ValidateSchemaKeyspace(ctx context.Context, req *vtctldat
// Otherwise we look at all the shards in the keyspace.
shards, err = s.ts.GetShardNames(ctx, keyspace)
if err != nil {
resp.Results = append(resp.Results, fmt.Sprintf("TopologyServer.GetShardNames(%v) failed: %v", req.Keyspace, err))
resp.Results = append(resp.Results, fmt.Sprintf("TopologyServer.GetShardNames(%s) failed: %v", req.Keyspace, err))
err = nil
return resp, err
}
Expand Down

0 comments on commit e89ca17

Please sign in to comment.