diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md index cbb1f37889c..c6c1eb0148e 100644 --- a/changelog/22.0/22.0.0/summary.md +++ b/changelog/22.0/22.0.0/summary.md @@ -15,6 +15,7 @@ - **[Stalled Disk Recovery in VTOrc](#stall-disk-recovery)** - **[Update default MySQL version to 8.0.40](#mysql-8-0-40)** - **[Update lite images to Debian Bookworm](#debian-bookworm)** + - **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)** - **[Support for Filtering Query logs on Error](#query-logs)** - **[Minor Changes](#minor-changes)** - **[VTTablet Flags](#flags-vttablet)** @@ -133,6 +134,11 @@ This is the last time this will be needed in the `8.0.x` series, as starting wit The base system now uses Debian Bookworm instead of Debian Bullseye for the `vitess/lite` images. This change was brought by [Pull Request #17552]. +### KeyRanges in `--clusters_to_watch` in VTOrc +VTOrc now supports specifying KeyRanges in the `--clusters_to_watch` flag. This is useful in scenarios where you don't need to restart a VTOrc instance if you run a reshard. +For example, if a VTOrc is configured to watch `ks/-80`, then it would watch all the shards that fall under the KeyRange `-80`. If a reshard is run and, `-80` is split into new shards `-40`, and `40-80`, the VTOrc instance will automatically start watching the new shard without needing a restart. +The users can still continue to specify exact key ranges too, and the new feature is backward compatible. + ### Support for Filtering Query logs on Error The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet. diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index ca8083709e5..c9d77be6082 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -24,7 +24,7 @@ Flags: --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system. --catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified --change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED - --clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80" + --clusters_to_watch strings Comma-separated list of keyspaces or keyspace/key-ranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80" --config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored. --config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn) --config-name string Name of the config file (without extension) to search for. (default "vtconfig") diff --git a/go/test/endtoend/topotest/consul/main_test.go b/go/test/endtoend/topotest/consul/main_test.go index b71551dc6b7..f7ab52e0a68 100644 --- a/go/test/endtoend/topotest/consul/main_test.go +++ b/go/test/endtoend/topotest/consul/main_test.go @@ -26,7 +26,9 @@ import ( topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils" "vitess.io/vitess/go/vt/log" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/require" @@ -221,6 +223,107 @@ func TestKeyspaceLocking(t *testing.T) { topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true) } +// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method. +// We test out different updates and see if we receive the correct update +// from the watch. +func TestWatchAllKeyspaceRecords(t *testing.T) { + // Create the topo server connection. + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldProcess.TopoGlobalAddress, clusterInstance.VtctldProcess.TopoGlobalRoot) + require.NoError(t, err) + + watchCtx, watchCancel := context.WithCancel(context.Background()) + defer watchCancel() + initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx) + require.NoError(t, err) + + // Check that we have the initial records. + // The existing keyspace and shard records should be seen. + require.Len(t, initRecords, 2) + var ksInfo *topo.KeyspaceInfo + var shardInfo *topo.ShardInfo + for _, record := range initRecords { + if record.KeyspaceInfo != nil { + ksInfo = record.KeyspaceInfo + } + if record.ShardInfo != nil { + shardInfo = record.ShardInfo + } + } + require.NotNil(t, ksInfo) + require.NotNil(t, shardInfo) + require.EqualValues(t, KeyspaceName, ksInfo.KeyspaceName()) + require.EqualValues(t, KeyspaceName, shardInfo.Keyspace()) + require.EqualValues(t, "0", shardInfo.ShardName()) + + // Create a new keyspace record and see that we receive an update. + newKeyspaceName := "ksTest" + err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_NORMAL, + DurabilityPolicy: policy.DurabilitySemiSync, + }) + require.NoError(t, err) + defer func() { + err = ts.DeleteKeyspace(context.Background(), newKeyspaceName) + require.NoError(t, err) + }() + + // Wait to receive an update from the watch. + record := <-ch + require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName()) + require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy) + + // Creating a shard should also trigger an update. + err = ts.CreateShard(context.Background(), newKeyspaceName, "-") + require.NoError(t, err) + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace()) + require.EqualValues(t, "-", record.ShardInfo.ShardName()) + require.Nil(t, record.ShardInfo.Shard.PrimaryAlias) + + primaryAlias := &topodatapb.TabletAlias{ + Cell: cell, + Uid: 100, + } + // Updating a shard should also trigger an update. + _, err = ts.UpdateShardFields(context.Background(), newKeyspaceName, "-", func(si *topo.ShardInfo) error { + si.PrimaryAlias = primaryAlias + return nil + }) + require.NoError(t, err) + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace()) + require.EqualValues(t, "-", record.ShardInfo.ShardName()) + require.NotNil(t, record.ShardInfo.Shard.PrimaryAlias) + + // Deleting a shard should also trigger an update. + err = ts.DeleteShard(context.Background(), newKeyspaceName, "-") + require.NoError(t, err) + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace()) + require.EqualValues(t, "-", record.ShardInfo.ShardName()) + require.Error(t, record.Err) + + // Update the keyspace record and see that we receive an update. + func() { + ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName) + require.NoError(t, err) + ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords") + require.NoError(t, err) + defer unlock(&err) + ki.DurabilityPolicy = policy.DurabilityCrossCell + err = ts.UpdateKeyspace(ctx, ki) + require.NoError(t, err) + }() + + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName()) + require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy) +} + func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result { t.Helper() qr, err := conn.ExecuteFetch(query, 1000, true) diff --git a/go/test/endtoend/topotest/etcd2/main_test.go b/go/test/endtoend/topotest/etcd2/main_test.go index b274219b41a..ac13e966689 100644 --- a/go/test/endtoend/topotest/etcd2/main_test.go +++ b/go/test/endtoend/topotest/etcd2/main_test.go @@ -26,7 +26,9 @@ import ( topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils" "vitess.io/vitess/go/test/endtoend/utils" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" "vitess.io/vitess/go/vt/log" @@ -268,6 +270,107 @@ func TestNamedLocking(t *testing.T) { topoutils.WaitForBoolValue(t, &secondCallerAcquired, true) } +// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method. +// We test out different updates and see if we receive the correct update +// from the watch. +func TestWatchAllKeyspaceRecords(t *testing.T) { + // Create the topo server connection. + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldProcess.TopoGlobalAddress, clusterInstance.VtctldProcess.TopoGlobalRoot) + require.NoError(t, err) + + watchCtx, watchCancel := context.WithCancel(context.Background()) + defer watchCancel() + initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx) + require.NoError(t, err) + + // Check that we have the initial records. + // The existing keyspace and shard records should be seen. + require.Len(t, initRecords, 2) + var ksInfo *topo.KeyspaceInfo + var shardInfo *topo.ShardInfo + for _, record := range initRecords { + if record.KeyspaceInfo != nil { + ksInfo = record.KeyspaceInfo + } + if record.ShardInfo != nil { + shardInfo = record.ShardInfo + } + } + require.NotNil(t, ksInfo) + require.NotNil(t, shardInfo) + require.EqualValues(t, KeyspaceName, ksInfo.KeyspaceName()) + require.EqualValues(t, KeyspaceName, shardInfo.Keyspace()) + require.EqualValues(t, "0", shardInfo.ShardName()) + + // Create a new keyspace record and see that we receive an update. + newKeyspaceName := "ksTest" + err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_NORMAL, + DurabilityPolicy: policy.DurabilitySemiSync, + }) + require.NoError(t, err) + defer func() { + err = ts.DeleteKeyspace(context.Background(), newKeyspaceName) + require.NoError(t, err) + }() + + // Wait to receive an update from the watch. + record := <-ch + require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName()) + require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy) + + // Creating a shard should also trigger an update. + err = ts.CreateShard(context.Background(), newKeyspaceName, "-") + require.NoError(t, err) + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace()) + require.EqualValues(t, "-", record.ShardInfo.ShardName()) + require.Nil(t, record.ShardInfo.Shard.PrimaryAlias) + + primaryAlias := &topodatapb.TabletAlias{ + Cell: cell, + Uid: 100, + } + // Updating a shard should also trigger an update. + _, err = ts.UpdateShardFields(context.Background(), newKeyspaceName, "-", func(si *topo.ShardInfo) error { + si.PrimaryAlias = primaryAlias + return nil + }) + require.NoError(t, err) + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace()) + require.EqualValues(t, "-", record.ShardInfo.ShardName()) + require.NotNil(t, record.ShardInfo.Shard.PrimaryAlias) + + // Deleting a shard should also trigger an update. + err = ts.DeleteShard(context.Background(), newKeyspaceName, "-") + require.NoError(t, err) + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace()) + require.EqualValues(t, "-", record.ShardInfo.ShardName()) + require.Error(t, record.Err) + + // Update the keyspace record and see that we receive an update. + func() { + ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName) + require.NoError(t, err) + ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords") + require.NoError(t, err) + defer unlock(&err) + ki.DurabilityPolicy = policy.DurabilityCrossCell + err = ts.UpdateKeyspace(ctx, ki) + require.NoError(t, err) + }() + + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName()) + require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy) +} + func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result { t.Helper() var res []*sqltypes.Result diff --git a/go/test/endtoend/topotest/zk2/main_test.go b/go/test/endtoend/topotest/zk2/main_test.go index 95a2fc13894..83b257f1609 100644 --- a/go/test/endtoend/topotest/zk2/main_test.go +++ b/go/test/endtoend/topotest/zk2/main_test.go @@ -25,7 +25,9 @@ import ( topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils" "vitess.io/vitess/go/test/endtoend/utils" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" "vitess.io/vitess/go/vt/log" @@ -197,6 +199,107 @@ func TestKeyspaceLocking(t *testing.T) { topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true) } +// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method. +// We test out different updates and see if we receive the correct update +// from the watch. +func TestWatchAllKeyspaceRecords(t *testing.T) { + // Create the topo server connection. + ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldProcess.TopoGlobalAddress, clusterInstance.VtctldProcess.TopoGlobalRoot) + require.NoError(t, err) + + watchCtx, watchCancel := context.WithCancel(context.Background()) + defer watchCancel() + initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx) + require.NoError(t, err) + + // Check that we have the initial records. + // The existing keyspace and shard records should be seen. + require.Len(t, initRecords, 2) + var ksInfo *topo.KeyspaceInfo + var shardInfo *topo.ShardInfo + for _, record := range initRecords { + if record.KeyspaceInfo != nil { + ksInfo = record.KeyspaceInfo + } + if record.ShardInfo != nil { + shardInfo = record.ShardInfo + } + } + require.NotNil(t, ksInfo) + require.NotNil(t, shardInfo) + require.EqualValues(t, KeyspaceName, ksInfo.KeyspaceName()) + require.EqualValues(t, KeyspaceName, shardInfo.Keyspace()) + require.EqualValues(t, "0", shardInfo.ShardName()) + + // Create a new keyspace record and see that we receive an update. + newKeyspaceName := "ksTest" + err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_NORMAL, + DurabilityPolicy: policy.DurabilitySemiSync, + }) + require.NoError(t, err) + defer func() { + err = ts.DeleteKeyspace(context.Background(), newKeyspaceName) + require.NoError(t, err) + }() + + // Wait to receive an update from the watch. + record := <-ch + require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName()) + require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy) + + // Creating a shard should also trigger an update. + err = ts.CreateShard(context.Background(), newKeyspaceName, "-") + require.NoError(t, err) + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace()) + require.EqualValues(t, "-", record.ShardInfo.ShardName()) + require.Nil(t, record.ShardInfo.Shard.PrimaryAlias) + + primaryAlias := &topodatapb.TabletAlias{ + Cell: cell, + Uid: 100, + } + // Updating a shard should also trigger an update. + _, err = ts.UpdateShardFields(context.Background(), newKeyspaceName, "-", func(si *topo.ShardInfo) error { + si.PrimaryAlias = primaryAlias + return nil + }) + require.NoError(t, err) + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace()) + require.EqualValues(t, "-", record.ShardInfo.ShardName()) + require.NotNil(t, record.ShardInfo.Shard.PrimaryAlias) + + // Deleting a shard should also trigger an update. + err = ts.DeleteShard(context.Background(), newKeyspaceName, "-") + require.NoError(t, err) + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace()) + require.EqualValues(t, "-", record.ShardInfo.ShardName()) + require.Error(t, record.Err) + + // Update the keyspace record and see that we receive an update. + func() { + ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName) + require.NoError(t, err) + ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords") + require.NoError(t, err) + defer unlock(&err) + ki.DurabilityPolicy = policy.DurabilityCrossCell + err = ts.UpdateKeyspace(ctx, ki) + require.NoError(t, err) + }() + + // Wait to receive an update from the watch. + record = <-ch + require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName()) + require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy) +} + func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result { t.Helper() var res []*sqltypes.Result diff --git a/go/vt/key/key.go b/go/vt/key/key.go index 89d956bd433..d951d123f0b 100644 --- a/go/vt/key/key.go +++ b/go/vt/key/key.go @@ -95,6 +95,14 @@ func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange { return &topodatapb.KeyRange{Start: start, End: end} } +// NewCompleteKeyRange returns a complete key range. +func NewCompleteKeyRange() *topodatapb.KeyRange { + return &topodatapb.KeyRange{ + Start: []byte{}, + End: []byte{}, + } +} + // KeyRangeAdd adds two adjacent KeyRange values (in any order) into a single value. If the values are not adjacent, // it returns false. func KeyRangeAdd(a, b *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) { diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go index 710bbee0653..3e2b6e1badf 100755 --- a/go/vt/topo/keyspace.go +++ b/go/vt/topo/keyspace.go @@ -51,6 +51,15 @@ type KeyspaceInfo struct { *topodatapb.Keyspace } +// NewKeyspaceInfo creates a new KeyspaceInfo. +func NewKeyspaceInfo(name string, keyspace *topodatapb.Keyspace, version Version) *KeyspaceInfo { + return &KeyspaceInfo{ + keyspace: name, + Keyspace: keyspace, + version: version, + } +} + // KeyspaceName returns the keyspace name func (ki *KeyspaceInfo) KeyspaceName() string { return ki.keyspace @@ -430,3 +439,121 @@ func (ts *Server) GetShardNames(ctx context.Context, keyspace string) ([]string, } return DirEntriesToStringArray(children), err } + +// WatchKeyspacePrefixData wraps the data we receive on the watch recursive channel +// The WatchAllKeyspaceAndShardRecords API guarantees exactly one of Value or Err will be set. +type WatchKeyspacePrefixData struct { + KeyspaceInfo *KeyspaceInfo + ShardInfo *ShardInfo + Err error +} + +// WatchAllKeyspaceAndShardRecords will set a watch on the Keyspace prefix. +// It has the same contract as conn.WatchRecursive, but it also unpacks the +// contents into a Keyspace object. +func (ts *Server) WatchAllKeyspaceAndShardRecords(ctx context.Context) ([]*WatchKeyspacePrefixData, <-chan *WatchKeyspacePrefixData, error) { + if err := ctx.Err(); err != nil { + return nil, nil, err + } + + ctx, cancel := context.WithCancel(ctx) + + // Set up a recursive watch on the KeyspacesPath. + current, wdChannel, err := ts.globalCell.WatchRecursive(ctx, KeyspacesPath) + if err != nil { + cancel() + return nil, nil, err + } + // Unpack the initial data. + initialRes, err := checkAndUnpackKeyspacePrefixRecord(current...) + if err != nil { + // Cancel the watch, drain channel. + cancel() + for range wdChannel { + } + return nil, nil, vterrors.Wrapf(err, "error unpacking initial Keyspace objects") + } + + changes := make(chan *WatchKeyspacePrefixData, 10) + // The background routine reads any event from the watch channel, + // translates it, and sends it to the caller. + // If cancel() is called, the underlying WatchRecursive() code will + // send an ErrInterrupted and then close the channel. We'll + // just propagate that back to our caller. + go func() { + defer cancel() + defer close(changes) + + for wd := range wdChannel { + if wd.Err != nil { + if IsErrType(wd.Err, NoNode) { + // One of the nodes was deleted. + // We have the path and it will be processed like normal. + // We make sure to copy the error from this to signal to the receiver + // that the node was deleted. + } else { + // Last error value, we're done. + // wdChannel will be closed right after + // this, no need to do anything. + changes <- &WatchKeyspacePrefixData{Err: wd.Err} + return + } + } + + res, err := checkAndUnpackKeyspacePrefixRecord(wd) + if err != nil { + cancel() + for range wdChannel { + } + changes <- &WatchKeyspacePrefixData{Err: vterrors.Wrapf(err, "error unpacking object")} + return + } + + // Each update will only have a single object, if at all. + // We get updates for all objects in the prefix, but we only + // care about the keyspace objects. + if len(res) == 0 { + continue + } + changes <- res[0] + } + }() + + return initialRes, changes, nil +} + +// checkAndUnpackKeyspacePrefixRecord checks for Keyspace objects and unpacks them. +func checkAndUnpackKeyspacePrefixRecord(wds ...*WatchDataRecursive) ([]*WatchKeyspacePrefixData, error) { + var res []*WatchKeyspacePrefixData + for _, wd := range wds { + fileDir, fileType := path.Split(wd.Path) + // Check the type of file. + //path.Join(KeyspacesPath, keyspace, ShardsPath, shard, ShardFile) + switch fileType { + case KeyspaceFile: + // Unpack a keyspace record. + ksName := path.Base(fileDir) + value := &topodatapb.Keyspace{} + if err := value.UnmarshalVT(wd.Contents); err != nil { + return nil, err + } + res = append(res, &WatchKeyspacePrefixData{ + Err: wd.Err, + KeyspaceInfo: NewKeyspaceInfo(ksName, value, wd.Version), + }) + case ShardFile: + shardName := path.Base(fileDir) + ksName := path.Base(path.Dir(path.Dir(path.Clean(fileDir)))) + // Unpack a shard record. + value := &topodatapb.Shard{} + if err := value.UnmarshalVT(wd.Contents); err != nil { + return nil, err + } + res = append(res, &WatchKeyspacePrefixData{ + Err: wd.Err, + ShardInfo: NewShardInfo(ksName, shardName, value, wd.Version), + }) + } + } + return res, nil +} diff --git a/go/vt/topo/keyspace_external_test.go b/go/vt/topo/keyspace_external_test.go index bfcb2f591a9..9de1afa514b 100644 --- a/go/vt/topo/keyspace_external_test.go +++ b/go/vt/topo/keyspace_external_test.go @@ -18,16 +18,20 @@ package topo_test import ( "context" + "errors" "fmt" "slices" + "sync" "testing" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/proto" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vtctl/reparentutil/policy" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -219,3 +223,332 @@ func TestServerGetServingShards(t *testing.T) { }) } } + +// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method. +// We test out different updates and see if we receive the correct update +// from the watch. +func TestWatchAllKeyspaceRecords(t *testing.T) { + ksDef := &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_NORMAL, + DurabilityPolicy: policy.DurabilitySemiSync, + SidecarDbName: "_vt", + } + tests := []struct { + name string + setupFunc func(t *testing.T, ts *topo.Server) + updateFunc func(t *testing.T, ts *topo.Server) + wantInitRecords []*topo.WatchKeyspacePrefixData + wantChanRecords []*topo.WatchKeyspacePrefixData + }{ + { + name: "Update Durability Policy in 1 Keyspace", + setupFunc: func(t *testing.T, ts *topo.Server) { + err := ts.CreateKeyspace(context.Background(), "ks", ksDef) + require.NoError(t, err) + }, + updateFunc: func(t *testing.T, ts *topo.Server) { + ki, err := ts.GetKeyspace(context.Background(), "ks") + require.NoError(t, err) + ki.DurabilityPolicy = policy.DurabilityCrossCell + ctx, unlock, err := ts.LockKeyspace(context.Background(), "ks", "test") + require.NoError(t, err) + defer unlock(&err) + err = ts.UpdateKeyspace(ctx, ki) + require.NoError(t, err) + }, + wantInitRecords: []*topo.WatchKeyspacePrefixData{ + { + KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef, nil), + }, + }, + wantChanRecords: []*topo.WatchKeyspacePrefixData{ + { + KeyspaceInfo: topo.NewKeyspaceInfo("ks", &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_NORMAL, + DurabilityPolicy: policy.DurabilityCrossCell, + SidecarDbName: "_vt", + }, nil), + }, + }, + }, + { + name: "New Keyspace Created", + setupFunc: func(t *testing.T, ts *topo.Server) { + }, + updateFunc: func(t *testing.T, ts *topo.Server) { + err := ts.CreateKeyspace(context.Background(), "ks", ksDef) + require.NoError(t, err) + }, + wantInitRecords: []*topo.WatchKeyspacePrefixData{}, + wantChanRecords: []*topo.WatchKeyspacePrefixData{ + { + KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef, nil), + }, + }, + }, + { + name: "New Shard Created", + setupFunc: func(t *testing.T, ts *topo.Server) { + err := ts.CreateKeyspace(context.Background(), "ks", ksDef) + require.NoError(t, err) + }, + updateFunc: func(t *testing.T, ts *topo.Server) { + err := ts.CreateShard(context.Background(), "ks", "-") + require.NoError(t, err) + }, + wantInitRecords: []*topo.WatchKeyspacePrefixData{ + { + KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef, nil), + }, + }, + wantChanRecords: []*topo.WatchKeyspacePrefixData{ + { + ShardInfo: topo.NewShardInfo("ks", "-", &topodatapb.Shard{ + KeyRange: &topodatapb.KeyRange{}, + IsPrimaryServing: true, + }, nil), + }, + }, + }, + { + name: "Keyspace Deleted", + setupFunc: func(t *testing.T, ts *topo.Server) { + err := ts.CreateKeyspace(context.Background(), "ks", ksDef) + require.NoError(t, err) + }, + updateFunc: func(t *testing.T, ts *topo.Server) { + err := ts.DeleteKeyspace(context.Background(), "ks") + require.NoError(t, err) + }, + wantInitRecords: []*topo.WatchKeyspacePrefixData{ + { + KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef, nil), + }, + }, + wantChanRecords: []*topo.WatchKeyspacePrefixData{ + { + Err: topo.NewError(topo.NoNode, "keyspaces/ks/Keyspace"), + KeyspaceInfo: topo.NewKeyspaceInfo("ks", &topodatapb.Keyspace{}, nil), + }, + }, + }, + { + name: "Update KeyspaceType in 1 Keyspace", + setupFunc: func(t *testing.T, ts *topo.Server) { + err := ts.CreateKeyspace(context.Background(), "ks", ksDef) + require.NoError(t, err) + }, + updateFunc: func(t *testing.T, ts *topo.Server) { + ki, err := ts.GetKeyspace(context.Background(), "ks") + require.NoError(t, err) + ki.KeyspaceType = topodatapb.KeyspaceType_SNAPSHOT + ctx, unlock, err := ts.LockKeyspace(context.Background(), "ks", "test") + require.NoError(t, err) + defer unlock(&err) + err = ts.UpdateKeyspace(ctx, ki) + require.NoError(t, err) + }, + wantInitRecords: []*topo.WatchKeyspacePrefixData{ + { + KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef, nil), + }, + }, + wantChanRecords: []*topo.WatchKeyspacePrefixData{ + { + KeyspaceInfo: topo.NewKeyspaceInfo("ks", &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_SNAPSHOT, + DurabilityPolicy: policy.DurabilitySemiSync, + SidecarDbName: "_vt", + }, nil), + }, + }, + }, + { + name: "Multiple updates in multiple keyspaces", + setupFunc: func(t *testing.T, ts *topo.Server) { + err := ts.CreateKeyspace(context.Background(), "ks", ksDef) + require.NoError(t, err) + err = ts.CreateKeyspace(context.Background(), "ks2", ksDef) + require.NoError(t, err) + err = ts.CreateShard(context.Background(), "ks2", "-") + require.NoError(t, err) + }, + updateFunc: func(t *testing.T, ts *topo.Server) { + func() { + ki, err := ts.GetKeyspace(context.Background(), "ks") + require.NoError(t, err) + ki.KeyspaceType = topodatapb.KeyspaceType_SNAPSHOT + ctx, unlock, err := ts.LockKeyspace(context.Background(), "ks", "test") + require.NoError(t, err) + defer unlock(&err) + err = ts.UpdateKeyspace(ctx, ki) + require.NoError(t, err) + }() + func() { + ki, err := ts.GetKeyspace(context.Background(), "ks2") + require.NoError(t, err) + ki.DurabilityPolicy = policy.DurabilityCrossCell + ctx, unlock, err := ts.LockKeyspace(context.Background(), "ks2", "test") + require.NoError(t, err) + defer unlock(&err) + err = ts.UpdateKeyspace(ctx, ki) + require.NoError(t, err) + }() + func() { + _, err := ts.UpdateShardFields(context.Background(), "ks2", "-", func(info *topo.ShardInfo) error { + info.IsPrimaryServing = false + return nil + }) + require.NoError(t, err) + }() + }, + wantInitRecords: []*topo.WatchKeyspacePrefixData{ + { + KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef, nil), + }, + { + KeyspaceInfo: topo.NewKeyspaceInfo("ks2", ksDef, nil), + }, + { + ShardInfo: topo.NewShardInfo("ks2", "-", &topodatapb.Shard{ + KeyRange: &topodatapb.KeyRange{}, + IsPrimaryServing: true, + }, nil), + }, + }, + wantChanRecords: []*topo.WatchKeyspacePrefixData{ + { + KeyspaceInfo: topo.NewKeyspaceInfo("ks", &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_SNAPSHOT, + DurabilityPolicy: policy.DurabilitySemiSync, + SidecarDbName: "_vt", + }, nil), + }, + { + KeyspaceInfo: topo.NewKeyspaceInfo("ks2", &topodatapb.Keyspace{ + KeyspaceType: topodatapb.KeyspaceType_NORMAL, + DurabilityPolicy: policy.DurabilityCrossCell, + SidecarDbName: "_vt", + }, nil), + }, + { + ShardInfo: topo.NewShardInfo("ks2", "-", &topodatapb.Shard{ + KeyRange: &topodatapb.KeyRange{}, + IsPrimaryServing: false, + }, nil), + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a memory topo server for the test. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts := memorytopo.NewServer(ctx) + defer ts.Close() + + // Do the initial setup before starting the watch. + tt.setupFunc(t, ts) + + // Start the watch and verify the initial records received. + watchCtx, watchCancel := context.WithCancel(ctx) + defer watchCancel() + initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx) + require.NoError(t, err) + elementsMatchFunc(t, tt.wantInitRecords, initRecords, watchKeyspacePrefixDataMatches) + + // We start a go routine to collect all the records from the channel. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + idx := 0 + for data := range ch { + if topo.IsErrType(data.Err, topo.Interrupted) { + continue + } + require.GreaterOrEqual(t, len(tt.wantChanRecords), idx+1) + require.True(t, watchKeyspacePrefixDataMatches(tt.wantChanRecords[idx], data)) + idx++ + // Stop the watch after we have verified we have received all required updates. + if idx == len(tt.wantChanRecords) { + watchCancel() + } + } + }() + + // Update the records and verify the records received on the channel. + tt.updateFunc(t, ts) + if len(tt.wantChanRecords) == 0 { + // If there are no records to verify, we can stop the watch. + watchCancel() + } + // Wait for the go routine to finish. + wg.Wait() + }) + } +} + +// elementsMatchFunc checks if two slices have the same elements (ignoring order), +// using a custom equality function to compare elements. +func elementsMatchFunc[T any](t *testing.T, expected, actual []T, equalFn func(a, b T) bool) { + require.Len(t, actual, len(expected)) + + visited := make([]bool, len(actual)) + for _, exp := range expected { + found := false + for i, act := range actual { + if visited[i] { + continue + } + if equalFn(exp, act) { + visited[i] = true + found = true + break + } + } + require.True(t, found, "Expected element %+v not found in actual slice", exp) + } +} + +// watchKeyspacePrefixDataMatches is a helper function to check equality of two topo.WatchKeyspacePrefixData. +func watchKeyspacePrefixDataMatches(a, b *topo.WatchKeyspacePrefixData) bool { + if a == nil || b == nil { + return a == b + } + if !errors.Is(a.Err, b.Err) { + return false + } + if a.KeyspaceInfo == nil || b.KeyspaceInfo == nil { + if a.KeyspaceInfo != b.KeyspaceInfo { + return false + } + } else { + if !proto.Equal(a.KeyspaceInfo.Keyspace, b.KeyspaceInfo.Keyspace) { + return false + } + if a.KeyspaceInfo.KeyspaceName() != b.KeyspaceInfo.KeyspaceName() { + return false + } + } + + if a.ShardInfo == nil || b.ShardInfo == nil { + if a.ShardInfo != b.ShardInfo { + return false + } + } else { + if !proto.Equal(a.ShardInfo.Shard, b.ShardInfo.Shard) { + return false + } + if a.ShardInfo.Keyspace() != b.ShardInfo.Keyspace() { + return false + } + if a.ShardInfo.ShardName() != b.ShardInfo.ShardName() { + return false + } + } + + return true +} diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go index 9d703a2869a..2f1bfed34fb 100644 --- a/go/vt/topo/memorytopo/memorytopo.go +++ b/go/vt/topo/memorytopo/memorytopo.go @@ -23,6 +23,7 @@ import ( "context" "errors" "math/rand/v2" + "path" "regexp" "strings" "sync" @@ -218,6 +219,13 @@ func (n *node) isDirectory() bool { return n.children != nil } +func (n *node) getFullPath() string { + if n.parent == nil { + return n.name + } + return path.Join(n.parent.getFullPath(), n.name) +} + func (n *node) recurseContents(callback func(n *node)) { if n.isDirectory() { for _, child := range n.children { diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go index dcb90a8f0ef..a8a80f20a0f 100644 --- a/go/vt/topo/memorytopo/watch.go +++ b/go/vt/topo/memorytopo/watch.go @@ -104,7 +104,7 @@ func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Watc var initialwd []*topo.WatchDataRecursive n.recurseContents(func(n *node) { initialwd = append(initialwd, &topo.WatchDataRecursive{ - Path: n.name, + Path: n.getFullPath(), WatchData: topo.WatchData{ Contents: n.contents, Version: NodeVersion(n.version), diff --git a/go/vt/topo/shard_test.go b/go/vt/topo/shard_test.go index 6bd4aae5b62..915bcd18e3c 100644 --- a/go/vt/topo/shard_test.go +++ b/go/vt/topo/shard_test.go @@ -323,6 +323,14 @@ func TestValidateShardName(t *testing.T) { }, valid: true, }, + { + name: "-", + expectedRange: &topodatapb.KeyRange{ + Start: []byte{}, + End: []byte{}, + }, + valid: true, + }, { name: "40-80", expectedRange: &topodatapb.KeyRange{ diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index 0dd17cb65fd..5beb3c8eb3b 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -18,9 +18,7 @@ package logic import ( "context" - "sort" - "strings" - "sync" + "time" "vitess.io/vitess/go/vt/log" @@ -28,62 +26,74 @@ import ( "vitess.io/vitess/go/vt/vtorc/inst" ) -// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with. -func RefreshAllKeyspacesAndShards(ctx context.Context) error { - var keyspaces []string - if len(clustersToWatch) == 0 { // all known keyspaces - ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer cancel() - var err error - // Get all the keyspaces - keyspaces, err = ts.GetKeyspaces(ctx) - if err != nil { - return err - } - } else { - // Parse input and build list of keyspaces - for _, ks := range clustersToWatch { - if strings.Contains(ks, "/") { - // This is a keyspace/shard specification - input := strings.Split(ks, "/") - keyspaces = append(keyspaces, input[0]) - } else { - // Assume this is a keyspace - keyspaces = append(keyspaces, ks) +// setupKeyspaceAndShardRecordsWatch sets up a watch on all keyspace and shard records. +func setupKeyspaceAndShardRecordsWatch(ctx context.Context, ts *topo.Server) { + go func() { + for { + if ctx.Err() != nil { + return + } + initialRecs, updateChan, err := ts.WatchAllKeyspaceAndShardRecords(ctx) + if err != nil { + if ctx.Err() != nil { + return + } + // Back for a while and then try setting up the watch again. + time.Sleep(10 * time.Second) + continue + } + for _, rec := range initialRecs { + err = processKeyspacePrefixWatchUpdate(rec) + if err != nil { + log.Errorf("failed to process initial keyspace/shard record: %+v", err) + break + } + } + if err != nil { + continue + } + + for data := range updateChan { + err = processKeyspacePrefixWatchUpdate(data) + if err != nil { + log.Errorf("failed to process keyspace/shard record update: %+v", err) + break + } } } - if len(keyspaces) == 0 { - log.Errorf("Found no keyspaces for input: %+v", clustersToWatch) - return nil - } + }() +} + +// processKeyspacePrefixWatchUpdate processes a keyspace prefix watch update. +func processKeyspacePrefixWatchUpdate(wd *topo.WatchKeyspacePrefixData) error { + // We ignore the error in the watch data. + // If there is an error that closes the watch, then + // we will open it again. + if wd.Err != nil { + return nil } + if wd.KeyspaceInfo != nil { + return processKeyspaceUpdate(wd) + } else if wd.ShardInfo != nil { + return processShardUpdate(wd) + } + return wd.Err +} - // Sort the list of keyspaces. - // The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace - sort.Strings(keyspaces) - refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer refreshCancel() - var wg sync.WaitGroup - for idx, keyspace := range keyspaces { - // Check if the current keyspace name is the same as the last one. - // If it is, then we know we have already refreshed its information. - // We do not need to do it again. - if idx != 0 && keyspace == keyspaces[idx-1] { - continue - } - wg.Add(2) - go func(keyspace string) { - defer wg.Done() - _ = refreshKeyspaceHelper(refreshCtx, keyspace) - }(keyspace) - go func(keyspace string) { - defer wg.Done() - _ = refreshAllShards(refreshCtx, keyspace) - }(keyspace) +// processShardUpdate processes a shard update. +func processShardUpdate(wd *topo.WatchKeyspacePrefixData) error { + if !shardPartOfWatch(wd.ShardInfo.Keyspace(), wd.ShardInfo.GetKeyRange()) { + return nil } - wg.Wait() + return inst.SaveShard(wd.ShardInfo) +} - return nil +// processKeyspaceUpdate processes a keyspace update. +func processKeyspaceUpdate(wd *topo.WatchKeyspacePrefixData) error { + if !keyspacePartOfWatch(wd.KeyspaceInfo.KeyspaceName()) { + return nil + } + return inst.SaveKeyspace(wd.KeyspaceInfo) } // RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard. @@ -123,28 +133,6 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error { return err } -// refreshAllShards refreshes all the shard records in the given keyspace. -func refreshAllShards(ctx context.Context, keyspaceName string) error { - shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{ - // Fetch shard records concurrently to speed up discovery. A typical - // Vitess cluster will have 1-3 vtorc instances deployed, so there is - // little risk of a thundering herd. - Concurrency: 8, - }) - if err != nil { - log.Error(err) - return err - } - for _, shardInfo := range shardInfos { - err = inst.SaveShard(shardInfo) - if err != nil { - log.Error(err) - return err - } - } - return nil -} - // refreshSingleShardHelper is a helper function that refreshes the shard record of the given keyspace/shard. func refreshSingleShardHelper(ctx context.Context, keyspaceName string, shardName string) error { shardInfo, err := ts.GetShard(ctx, keyspaceName, shardName) diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go index 8218af45db6..a1a80b5138c 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go @@ -20,10 +20,12 @@ import ( "context" "fmt" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "vitess.io/vitess/go/vt/external/golib/sqlutils" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" @@ -52,7 +54,86 @@ var ( } ) -func TestRefreshAllKeyspaces(t *testing.T) { +// TestSetupKeyspaceAndShardRecordsWatch tests that the watch is setup correctly for keyspace and shard records. +func TestSetupKeyspaceAndShardRecordsWatch(t *testing.T) { + // Store the old flags and restore on test completion + oldTs := ts + oldClustersToWatch := clustersToWatch + defer func() { + ts = oldTs + clustersToWatch = oldClustersToWatch + }() + + db.ClearVTOrcDatabase() + defer func() { + db.ClearVTOrcDatabase() + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts = memorytopo.NewServer(ctx, "zone1") + + for _, ks := range []string{"ks1", "ks2"} { + err := ts.CreateKeyspace(ctx, ks, keyspaceDurabilityNone) + require.NoError(t, err) + for idx, sh := range []string{"-80", "80-"} { + err = ts.CreateShard(ctx, ks, sh) + require.NoError(t, err) + _, err = ts.UpdateShardFields(ctx, ks, sh, func(si *topo.ShardInfo) error { + si.PrimaryAlias = &topodatapb.TabletAlias{ + Cell: fmt.Sprintf("zone_%v", ks), + Uid: uint32(100 + idx), + } + return nil + }) + require.NoError(t, err) + } + } + + // Set up the keyspace and shard watch. + setupKeyspaceAndShardRecordsWatch(ctx, ts) + waitForKeyspaceCount(t, 2) + // Verify that we only have ks1 and ks2 in vtorc's db. + verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "") + verifyPrimaryAlias(t, "ks1", "-80", "zone_ks1-0000000100", "") + verifyKeyspaceInfo(t, "ks2", keyspaceDurabilityNone, "") + verifyPrimaryAlias(t, "ks2", "80-", "zone_ks2-0000000101", "") + verifyKeyspaceInfo(t, "ks3", nil, "keyspace not found") + verifyPrimaryAlias(t, "ks3", "80-", "", "shard not found") + verifyKeyspaceInfo(t, "ks4", nil, "keyspace not found") + + // Update primary on the shard. + _, err := ts.UpdateShardFields(ctx, "ks1", "-80", func(si *topo.ShardInfo) error { + si.PrimaryAlias.Cell = "updated_new_cell" + return nil + }) + require.NoError(t, err) + + // Delete a shard. + // We will verify that we don't delete a shard info in VTOrc. + // We ignore delete updates for now. + err = ts.DeleteShard(ctx, "ks2", "80-") + require.NoError(t, err) + + // Create a new keyspace record. + err = ts.CreateKeyspace(ctx, "ks3", keyspaceDurabilitySemiSync) + require.NoError(t, err) + + // Check that the watch sees these updates. + waitForKeyspaceCount(t, 3) + // Verify that we only have ks1 and ks2 in vtorc's db. + verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "") + verifyPrimaryAlias(t, "ks1", "-80", "updated_new_cell-0000000100", "") + verifyKeyspaceInfo(t, "ks2", keyspaceDurabilityNone, "") + verifyPrimaryAlias(t, "ks2", "80-", "zone_ks2-0000000101", "") + verifyKeyspaceInfo(t, "ks3", keyspaceDurabilitySemiSync, "") + verifyPrimaryAlias(t, "ks3", "80-", "", "shard not found") + verifyKeyspaceInfo(t, "ks4", nil, "keyspace not found") +} + +// TestInitialSetupOfWatch tests that the initial setup of the watch for shards +// and keyspaces loads the latest information from the topo server. +func TestInitialSetupOfWatch(t *testing.T) { // Store the old flags and restore on test completion oldTs := ts oldClustersToWatch := clustersToWatch @@ -93,7 +174,11 @@ func TestRefreshAllKeyspaces(t *testing.T) { // Set clusters to watch to only watch ks1 and ks3 onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"} clustersToWatch = onlyKs1and3 - require.NoError(t, RefreshAllKeyspacesAndShards(context.Background())) + initializeShardsToWatch() + watchCtx, watchCancel := context.WithCancel(context.Background()) + setupKeyspaceAndShardRecordsWatch(watchCtx, ts) + waitForKeyspaceCount(t, 2) + watchCancel() // Verify that we only have ks1 and ks3 in vtorc's db. verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "") @@ -106,9 +191,13 @@ func TestRefreshAllKeyspaces(t *testing.T) { // Set clusters to watch to watch all keyspaces clustersToWatch = nil + initializeShardsToWatch() // Change the durability policy of ks1 reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", policy.DurabilitySemiSync) - require.NoError(t, RefreshAllKeyspacesAndShards(context.Background())) + watchCtx, watchCancel = context.WithCancel(context.Background()) + setupKeyspaceAndShardRecordsWatch(watchCtx, ts) + waitForKeyspaceCount(t, 4) + watchCancel() // Verify that all the keyspaces are correctly reloaded verifyKeyspaceInfo(t, "ks1", keyspaceDurabilitySemiSync, "") @@ -119,7 +208,30 @@ func TestRefreshAllKeyspaces(t *testing.T) { verifyPrimaryAlias(t, "ks3", "80-", "zone_ks3-0000000101", "") verifyKeyspaceInfo(t, "ks4", keyspaceDurabilityTest, "") verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "") +} +// waitForKeyspaceCount waits for the keyspace count to match the expected value. +func waitForKeyspaceCount(t *testing.T, count int) { + t.Helper() + timeout := time.After(10 * time.Second) + for { + select { + case <-timeout: + t.Errorf("timed out waiting for keyspace count") + return + default: + } + var curCount = 0 + err := db.QueryVTOrcRowsMap("select count(*) as c from vitess_keyspace", func(row sqlutils.RowMap) error { + curCount = row.GetInt("c") + return nil + }) + require.NoError(t, err) + if curCount == count { + return + } + time.Sleep(100 * time.Millisecond) + } } func TestRefreshKeyspace(t *testing.T) { diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index eb10bb2a667..b06177d1b1a 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -32,11 +32,11 @@ import ( "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/external/golib/sqlutils" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" - "vitess.io/vitess/go/vt/vtorc/config" "vitess.io/vitess/go/vt/vtorc/db" "vitess.io/vitess/go/vt/vtorc/inst" "vitess.io/vitess/go/vt/vttablet/tmclient" @@ -48,8 +48,10 @@ var ( clustersToWatch []string shutdownWaitTime = 30 * time.Second shardsLockCounter int32 - shardsToWatch map[string][]string - shardsToWatchMu sync.Mutex + // shardsToWatch is a map storing the shards for a given keyspace that need to be watched. + // We store the key range for all the shards that we want to watch. + // This is populated by parsing `--clusters_to_watch` flag. + shardsToWatch map[string][]*topodatapb.KeyRange // ErrNoPrimaryTablet is a fixed error message. ErrNoPrimaryTablet = errors.New("no primary tablet found") @@ -57,18 +59,18 @@ var ( // RegisterFlags registers the flags required by VTOrc func RegisterFlags(fs *pflag.FlagSet) { - fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") + fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/key-ranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM") } -// updateShardsToWatch parses the --clusters_to_watch flag-value +// initializeShardsToWatch parses the --clusters_to_watch flag-value // into a map of keyspace/shards. -func updateShardsToWatch() { +func initializeShardsToWatch() { + shardsToWatch = make(map[string][]*topodatapb.KeyRange) if len(clustersToWatch) == 0 { return } - newShardsToWatch := make(map[string][]string, 0) for _, ks := range clustersToWatch { if strings.Contains(ks, "/") && !strings.HasSuffix(ks, "/") { // Validate keyspace/shard parses. @@ -77,39 +79,65 @@ func updateShardsToWatch() { log.Errorf("Could not parse keyspace/shard %q: %+v", ks, err) continue } - newShardsToWatch[k] = append(newShardsToWatch[k], s) - } else { - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - // Assume this is a keyspace and find all shards in keyspace. - // Remove trailing slash if exists. - ks = strings.TrimSuffix(ks, "/") - shards, err := ts.GetShardNames(ctx, ks) + // Parse the shard name into key range value. + _, keyRange, err := topo.ValidateShardName(s) if err != nil { - // Log the err and continue. - log.Errorf("Error fetching shards for keyspace: %v", ks) - continue + log.Errorf("Could not parse shard name %q: %+v", s, err) } - if len(shards) == 0 { - log.Errorf("Topo has no shards for ks: %v", ks) - continue + // If the key range is nil, then the user is not using RangeBased Sharding. + // So we want to watch all the shards of the keyspace. + if keyRange == nil { + keyRange = key.NewCompleteKeyRange() } - newShardsToWatch[ks] = shards + shardsToWatch[k] = append(shardsToWatch[k], keyRange) + } else { + // Remove trailing slash if exists. + ks = strings.TrimSuffix(ks, "/") + // We store the entire range of key range if nothing is specified. + shardsToWatch[ks] = []*topodatapb.KeyRange{key.NewCompleteKeyRange()} } } - if len(newShardsToWatch) == 0 { - log.Error("No keyspace/shards to watch") - return + + if len(shardsToWatch) == 0 { + log.Error("No keyspace/shards to watch, watching all keyspaces") } +} - shardsToWatchMu.Lock() - defer shardsToWatchMu.Unlock() - shardsToWatch = newShardsToWatch +// keyspacePartOfWatch checks if the given keyspace is part of the watch list. +func keyspacePartOfWatch(keyspace string) bool { + // If we are watching all keyspaces, then we want to watch this keyspace too. + if len(shardsToWatch) == 0 { + return true + } + _, shouldWatch := shardsToWatch[keyspace] + return shouldWatch +} + +// shardPartOfWatch checks if the given tablet is part of the watch list. +func shardPartOfWatch(keyspace string, keyRange *topodatapb.KeyRange) bool { + // If we are watching all keyspaces, then we want to watch this tablet too. + if len(shardsToWatch) == 0 { + return true + } + shardRanges, ok := shardsToWatch[keyspace] + // If we don't have the keyspace in our map, then this tablet + // doesn't need to be watched. + if !ok { + return false + } + + // Check if the key range is part of the shard ranges we are watching. + for _, shardRange := range shardRanges { + if key.KeyRangeContainsKeyRange(shardRange, keyRange) { + return true + } + } + return false } -// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker +// OpenDiscoveryFromTopo opens the vitess topo if enables and returns a ticker // channel for polling. -func OpenTabletDiscovery() <-chan time.Time { +func OpenDiscoveryFromTopo() { ts = topo.Open() tmc = inst.InitializeTMC() // Clear existing cache and perform a new refresh. @@ -117,15 +145,15 @@ func OpenTabletDiscovery() <-chan time.Time { log.Error(err) } // Parse --clusters_to_watch into a filter. - updateShardsToWatch() + initializeShardsToWatch() + setupKeyspaceAndShardRecordsWatch(context.Background(), ts) // We refresh all information from the topo once before we start the ticks to do // it on a timer. ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) defer cancel() - if err := refreshAllInformation(ctx); err != nil { + if err := refreshTopoTick(ctx); err != nil { log.Errorf("failed to initialize topo information: %+v", err) } - return time.Tick(config.GetTopoInformationRefreshDuration()) //nolint SA1015: using time.Tick leaks the underlying ticker } // getAllTablets gets all tablets from all cells using a goroutine per cell. @@ -179,16 +207,10 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f // Filter tablets that should not be watched using shardsToWatch map. matchedTablets := make([]*topo.TabletInfo, 0, len(tablets)) func() { - shardsToWatchMu.Lock() - defer shardsToWatchMu.Unlock() for _, t := range tablets { - if len(shardsToWatch) > 0 { - _, ok := shardsToWatch[t.Tablet.Keyspace] - if !ok || !slices.Contains(shardsToWatch[t.Tablet.Keyspace], t.Tablet.Shard) { - continue // filter - } + if shardPartOfWatch(t.Tablet.GetKeyspace(), t.Tablet.GetKeyRange()) { + matchedTablets = append(matchedTablets, t) } - matchedTablets = append(matchedTablets, t) } }() diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index 54284e8a017..90cd684e125 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/external/golib/sqlutils" + "vitess.io/vitess/go/vt/key" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/proto/vttime" "vitess.io/vitess/go/vt/topo" @@ -102,60 +103,209 @@ var ( } ) -func TestUpdateShardsToWatch(t *testing.T) { +func TestPartOfWatch(t *testing.T) { oldClustersToWatch := clustersToWatch - oldTs := ts defer func() { clustersToWatch = oldClustersToWatch shardsToWatch = nil - ts = oldTs }() - // Create a memory topo-server and create the keyspace and shard records - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + testCases := []struct { + in []string + tablet *topodatapb.Tablet + expectedShardPartOfWatch bool + expectedKsPartOfWatch bool + }{ + { + in: []string{}, + tablet: &topodatapb.Tablet{ + Keyspace: keyspace, + Shard: shard, + }, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, + }, + { + in: []string{keyspace}, + tablet: &topodatapb.Tablet{ + Keyspace: keyspace, + Shard: shard, + }, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, + }, + { + in: []string{keyspace + "/-"}, + tablet: &topodatapb.Tablet{ + Keyspace: keyspace, + Shard: shard, + }, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, + }, + { + in: []string{keyspace + "/" + shard}, + tablet: &topodatapb.Tablet{ + Keyspace: keyspace, + Shard: shard, + }, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, + }, + { + in: []string{"ks/-70", "ks/70-"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x50}, []byte{0x70}), + }, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, + }, + { + in: []string{"ks/-70", "ks/70-"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x40}, []byte{0x50}), + }, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, + }, + { + in: []string{"ks/-70", "ks/70-"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x70}, []byte{0x90}), + }, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, + }, + { + in: []string{"ks/-70", "ks/70-"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x60}, []byte{0x90}), + }, + expectedShardPartOfWatch: false, + expectedKsPartOfWatch: true, + }, + { + in: []string{"ks/50-70"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x50}, []byte{0x70}), + }, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, + }, + { + in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x60}, []byte{0x80}), + }, + expectedShardPartOfWatch: true, + expectedKsPartOfWatch: true, + }, + { + in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x80}, []byte{0x90}), + }, + expectedShardPartOfWatch: false, + expectedKsPartOfWatch: true, + }, + { + in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x90}, []byte{0xa0}), + }, + expectedShardPartOfWatch: false, + expectedKsPartOfWatch: true, + }, + { + in: []string{"ks2/-70", "ks2/70-", "ks/-80"}, + tablet: &topodatapb.Tablet{ + Keyspace: "unknownKs", + KeyRange: key.NewKeyRange([]byte{0x90}, []byte{0xa0}), + }, + expectedShardPartOfWatch: false, + expectedKsPartOfWatch: false, + }, + } - ts = memorytopo.NewServer(ctx, cell1) - _, err := ts.GetOrCreateShard(context.Background(), keyspace, shard) - require.NoError(t, err) + for _, tt := range testCases { + t.Run(fmt.Sprintf("%v-Tablet-%v-%v", strings.Join(tt.in, ","), tt.tablet.GetKeyspace(), tt.tablet.GetShard()), func(t *testing.T) { + clustersToWatch = tt.in + initializeShardsToWatch() + assert.Equal(t, tt.expectedShardPartOfWatch, shardPartOfWatch(tt.tablet.GetKeyspace(), tt.tablet.GetKeyRange())) + assert.Equal(t, tt.expectedKsPartOfWatch, keyspacePartOfWatch(tt.tablet.GetKeyspace())) + }) + } +} + +// TestInitializeShardsToWatch tests that we initialize the shardsToWatch map correctly +// using the `--clusters_to_watch` flag. +func TestInitializeShardsToWatch(t *testing.T) { + oldClustersToWatch := clustersToWatch + defer func() { + clustersToWatch = oldClustersToWatch + shardsToWatch = nil + }() testCases := []struct { in []string - expected map[string][]string + expected map[string][]*topodatapb.KeyRange }{ { in: []string{}, - expected: nil, + expected: map[string][]*topodatapb.KeyRange{}, }, { - in: []string{""}, - expected: map[string][]string{}, + in: []string{"unknownKs"}, + expected: map[string][]*topodatapb.KeyRange{ + "unknownKs": { + key.NewCompleteKeyRange(), + }, + }, }, { in: []string{"test/-"}, - expected: map[string][]string{ - "test": {"-"}, + expected: map[string][]*topodatapb.KeyRange{ + "test": { + key.NewCompleteKeyRange(), + }, }, }, { in: []string{"test/-", "test2/-80", "test2/80-"}, - expected: map[string][]string{ - "test": {"-"}, - "test2": {"-80", "80-"}, + expected: map[string][]*topodatapb.KeyRange{ + "test": { + key.NewCompleteKeyRange(), + }, + "test2": { + key.NewKeyRange([]byte{}, []byte{0x80}), + key.NewKeyRange([]byte{0x80}, []byte{}), + }, }, }, { - // confirm shards fetch from topo + // known keyspace in: []string{keyspace}, - expected: map[string][]string{ - keyspace: {shard}, + expected: map[string][]*topodatapb.KeyRange{ + keyspace: { + key.NewCompleteKeyRange(), + }, }, }, { - // confirm shards fetch from topo when keyspace has trailing-slash + // keyspace with trailing-slash in: []string{keyspace + "/"}, - expected: map[string][]string{ - keyspace: {shard}, + expected: map[string][]*topodatapb.KeyRange{ + keyspace: { + key.NewCompleteKeyRange(), + }, }, }, } @@ -163,10 +313,10 @@ func TestUpdateShardsToWatch(t *testing.T) { for _, testCase := range testCases { t.Run(strings.Join(testCase.in, ","), func(t *testing.T) { defer func() { - shardsToWatch = make(map[string][]string, 0) + shardsToWatch = make(map[string][]*topodatapb.KeyRange, 0) }() clustersToWatch = testCase.in - updateShardsToWatch() + initializeShardsToWatch() require.Equal(t, testCase.expected, shardsToWatch) }) } diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 1fde6e31c0d..5ed1e605015 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -24,7 +24,6 @@ import ( "github.com/patrickmn/go-cache" "github.com/sjmudd/stopwatch" - "golang.org/x/sync/errgroup" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" @@ -255,10 +254,11 @@ func ContinuousDiscovery() { go handleDiscoveryRequests() + OpenDiscoveryFromTopo() healthTick := time.Tick(config.HealthPollSeconds * time.Second) caretakingTick := time.Tick(time.Minute) recoveryTick := time.Tick(config.GetRecoveryPollDuration()) - tabletTopoTick := OpenTabletDiscovery() + tabletTopoTick := time.Tick(config.GetTopoInformationRefreshDuration()) var recoveryEntrance int64 var snapshotTopologiesTick <-chan time.Time if config.GetSnapshotTopologyInterval() > 0 { @@ -308,7 +308,7 @@ func ContinuousDiscovery() { }() case <-tabletTopoTick: ctx, cancel := context.WithTimeout(context.Background(), config.GetTopoInformationRefreshDuration()) - if err := refreshAllInformation(ctx); err != nil { + if err := refreshTopoTick(ctx); err != nil { log.Errorf("failed to refresh topo information: %+v", err) } cancel() @@ -316,29 +316,9 @@ func ContinuousDiscovery() { } } -// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks. -func refreshAllInformation(ctx context.Context) error { - // Create an errgroup - eg, ctx := errgroup.WithContext(ctx) - - // Refresh all keyspace information. - eg.Go(func() error { - return RefreshAllKeyspacesAndShards(ctx) - }) - - // Refresh shards to watch. - eg.Go(func() error { - updateShardsToWatch() - return nil - }) - - // Refresh all tablets. - eg.Go(func() error { - return refreshAllTablets(ctx) - }) - - // Wait for both the refreshes to complete - err := eg.Wait() +// refreshTopoTick refreshes tablet information from the topo server on a time tick. +func refreshTopoTick(ctx context.Context) error { + err := refreshAllTablets(ctx) if err == nil { process.FirstDiscoveryCycleComplete.Store(true) } diff --git a/go/vt/vtorc/logic/vtorc_test.go b/go/vt/vtorc/logic/vtorc_test.go index edd8141e8b7..8bb34f94628 100644 --- a/go/vt/vtorc/logic/vtorc_test.go +++ b/go/vt/vtorc/logic/vtorc_test.go @@ -61,7 +61,7 @@ func waitForLocksReleaseAndGetTimeWaitedFor() time.Duration { return time.Since(start) } -func TestRefreshAllInformation(t *testing.T) { +func TestRefreshTopoTick(t *testing.T) { // Store the old flags and restore on test completion oldTs := ts defer func() { @@ -85,7 +85,7 @@ func TestRefreshAllInformation(t *testing.T) { // Test error ctx, cancel := context.WithCancel(context.Background()) cancel() // cancel context to simulate timeout - require.Error(t, refreshAllInformation(ctx)) + require.Error(t, refreshTopoTick(ctx)) require.False(t, process.FirstDiscoveryCycleComplete.Load()) _, discoveredOnce = process.HealthTest() require.False(t, discoveredOnce) @@ -93,7 +93,7 @@ func TestRefreshAllInformation(t *testing.T) { // Test success ctx2, cancel2 := context.WithCancel(context.Background()) defer cancel2() - require.NoError(t, refreshAllInformation(ctx2)) + require.NoError(t, refreshTopoTick(ctx2)) require.True(t, process.FirstDiscoveryCycleComplete.Load()) _, discoveredOnce = process.HealthTest() require.True(t, discoveredOnce)