diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md
index cbb1f37889c..c6c1eb0148e 100644
--- a/changelog/22.0/22.0.0/summary.md
+++ b/changelog/22.0/22.0.0/summary.md
@@ -15,6 +15,7 @@
- **[Stalled Disk Recovery in VTOrc](#stall-disk-recovery)**
- **[Update default MySQL version to 8.0.40](#mysql-8-0-40)**
- **[Update lite images to Debian Bookworm](#debian-bookworm)**
+ - **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)**
- **[Support for Filtering Query logs on Error](#query-logs)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
@@ -133,6 +134,11 @@ This is the last time this will be needed in the `8.0.x` series, as starting wit
The base system now uses Debian Bookworm instead of Debian Bullseye for the `vitess/lite` images. This change was brought by [Pull Request #17552].
+### KeyRanges in `--clusters_to_watch` in VTOrc
+VTOrc now supports specifying KeyRanges in the `--clusters_to_watch` flag. This is useful in scenarios where you don't need to restart a VTOrc instance if you run a reshard.
+For example, if a VTOrc is configured to watch `ks/-80`, then it would watch all the shards that fall under the KeyRange `-80`. If a reshard is run and, `-80` is split into new shards `-40`, and `40-80`, the VTOrc instance will automatically start watching the new shard without needing a restart.
+The users can still continue to specify exact key ranges too, and the new feature is backward compatible.
+
### Support for Filtering Query logs on Error
The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet.
diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt
index ca8083709e5..c9d77be6082 100644
--- a/go/flags/endtoend/vtorc.txt
+++ b/go/flags/endtoend/vtorc.txt
@@ -24,7 +24,7 @@ Flags:
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED
- --clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
+ --clusters_to_watch strings Comma-separated list of keyspaces or keyspace/key-ranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored.
--config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn)
--config-name string Name of the config file (without extension) to search for. (default "vtconfig")
diff --git a/go/test/endtoend/topotest/consul/main_test.go b/go/test/endtoend/topotest/consul/main_test.go
index b71551dc6b7..f7ab52e0a68 100644
--- a/go/test/endtoend/topotest/consul/main_test.go
+++ b/go/test/endtoend/topotest/consul/main_test.go
@@ -26,7 +26,9 @@ import (
topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/vt/log"
+ topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
+ "vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
@@ -221,6 +223,107 @@ func TestKeyspaceLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}
+// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method.
+// We test out different updates and see if we receive the correct update
+// from the watch.
+func TestWatchAllKeyspaceRecords(t *testing.T) {
+ // Create the topo server connection.
+ ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldProcess.TopoGlobalAddress, clusterInstance.VtctldProcess.TopoGlobalRoot)
+ require.NoError(t, err)
+
+ watchCtx, watchCancel := context.WithCancel(context.Background())
+ defer watchCancel()
+ initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx)
+ require.NoError(t, err)
+
+ // Check that we have the initial records.
+ // The existing keyspace and shard records should be seen.
+ require.Len(t, initRecords, 2)
+ var ksInfo *topo.KeyspaceInfo
+ var shardInfo *topo.ShardInfo
+ for _, record := range initRecords {
+ if record.KeyspaceInfo != nil {
+ ksInfo = record.KeyspaceInfo
+ }
+ if record.ShardInfo != nil {
+ shardInfo = record.ShardInfo
+ }
+ }
+ require.NotNil(t, ksInfo)
+ require.NotNil(t, shardInfo)
+ require.EqualValues(t, KeyspaceName, ksInfo.KeyspaceName())
+ require.EqualValues(t, KeyspaceName, shardInfo.Keyspace())
+ require.EqualValues(t, "0", shardInfo.ShardName())
+
+ // Create a new keyspace record and see that we receive an update.
+ newKeyspaceName := "ksTest"
+ err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{
+ KeyspaceType: topodatapb.KeyspaceType_NORMAL,
+ DurabilityPolicy: policy.DurabilitySemiSync,
+ })
+ require.NoError(t, err)
+ defer func() {
+ err = ts.DeleteKeyspace(context.Background(), newKeyspaceName)
+ require.NoError(t, err)
+ }()
+
+ // Wait to receive an update from the watch.
+ record := <-ch
+ require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
+ require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy)
+
+ // Creating a shard should also trigger an update.
+ err = ts.CreateShard(context.Background(), newKeyspaceName, "-")
+ require.NoError(t, err)
+ // Wait to receive an update from the watch.
+ record = <-ch
+ require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
+ require.EqualValues(t, "-", record.ShardInfo.ShardName())
+ require.Nil(t, record.ShardInfo.Shard.PrimaryAlias)
+
+ primaryAlias := &topodatapb.TabletAlias{
+ Cell: cell,
+ Uid: 100,
+ }
+ // Updating a shard should also trigger an update.
+ _, err = ts.UpdateShardFields(context.Background(), newKeyspaceName, "-", func(si *topo.ShardInfo) error {
+ si.PrimaryAlias = primaryAlias
+ return nil
+ })
+ require.NoError(t, err)
+ // Wait to receive an update from the watch.
+ record = <-ch
+ require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
+ require.EqualValues(t, "-", record.ShardInfo.ShardName())
+ require.NotNil(t, record.ShardInfo.Shard.PrimaryAlias)
+
+ // Deleting a shard should also trigger an update.
+ err = ts.DeleteShard(context.Background(), newKeyspaceName, "-")
+ require.NoError(t, err)
+ // Wait to receive an update from the watch.
+ record = <-ch
+ require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
+ require.EqualValues(t, "-", record.ShardInfo.ShardName())
+ require.Error(t, record.Err)
+
+ // Update the keyspace record and see that we receive an update.
+ func() {
+ ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName)
+ require.NoError(t, err)
+ ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords")
+ require.NoError(t, err)
+ defer unlock(&err)
+ ki.DurabilityPolicy = policy.DurabilityCrossCell
+ err = ts.UpdateKeyspace(ctx, ki)
+ require.NoError(t, err)
+ }()
+
+ // Wait to receive an update from the watch.
+ record = <-ch
+ require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
+ require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy)
+}
+
func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
diff --git a/go/test/endtoend/topotest/etcd2/main_test.go b/go/test/endtoend/topotest/etcd2/main_test.go
index b274219b41a..ac13e966689 100644
--- a/go/test/endtoend/topotest/etcd2/main_test.go
+++ b/go/test/endtoend/topotest/etcd2/main_test.go
@@ -26,7 +26,9 @@ import (
topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
+ topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
+ "vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/log"
@@ -268,6 +270,107 @@ func TestNamedLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondCallerAcquired, true)
}
+// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method.
+// We test out different updates and see if we receive the correct update
+// from the watch.
+func TestWatchAllKeyspaceRecords(t *testing.T) {
+ // Create the topo server connection.
+ ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldProcess.TopoGlobalAddress, clusterInstance.VtctldProcess.TopoGlobalRoot)
+ require.NoError(t, err)
+
+ watchCtx, watchCancel := context.WithCancel(context.Background())
+ defer watchCancel()
+ initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx)
+ require.NoError(t, err)
+
+ // Check that we have the initial records.
+ // The existing keyspace and shard records should be seen.
+ require.Len(t, initRecords, 2)
+ var ksInfo *topo.KeyspaceInfo
+ var shardInfo *topo.ShardInfo
+ for _, record := range initRecords {
+ if record.KeyspaceInfo != nil {
+ ksInfo = record.KeyspaceInfo
+ }
+ if record.ShardInfo != nil {
+ shardInfo = record.ShardInfo
+ }
+ }
+ require.NotNil(t, ksInfo)
+ require.NotNil(t, shardInfo)
+ require.EqualValues(t, KeyspaceName, ksInfo.KeyspaceName())
+ require.EqualValues(t, KeyspaceName, shardInfo.Keyspace())
+ require.EqualValues(t, "0", shardInfo.ShardName())
+
+ // Create a new keyspace record and see that we receive an update.
+ newKeyspaceName := "ksTest"
+ err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{
+ KeyspaceType: topodatapb.KeyspaceType_NORMAL,
+ DurabilityPolicy: policy.DurabilitySemiSync,
+ })
+ require.NoError(t, err)
+ defer func() {
+ err = ts.DeleteKeyspace(context.Background(), newKeyspaceName)
+ require.NoError(t, err)
+ }()
+
+ // Wait to receive an update from the watch.
+ record := <-ch
+ require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
+ require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy)
+
+ // Creating a shard should also trigger an update.
+ err = ts.CreateShard(context.Background(), newKeyspaceName, "-")
+ require.NoError(t, err)
+ // Wait to receive an update from the watch.
+ record = <-ch
+ require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
+ require.EqualValues(t, "-", record.ShardInfo.ShardName())
+ require.Nil(t, record.ShardInfo.Shard.PrimaryAlias)
+
+ primaryAlias := &topodatapb.TabletAlias{
+ Cell: cell,
+ Uid: 100,
+ }
+ // Updating a shard should also trigger an update.
+ _, err = ts.UpdateShardFields(context.Background(), newKeyspaceName, "-", func(si *topo.ShardInfo) error {
+ si.PrimaryAlias = primaryAlias
+ return nil
+ })
+ require.NoError(t, err)
+ // Wait to receive an update from the watch.
+ record = <-ch
+ require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
+ require.EqualValues(t, "-", record.ShardInfo.ShardName())
+ require.NotNil(t, record.ShardInfo.Shard.PrimaryAlias)
+
+ // Deleting a shard should also trigger an update.
+ err = ts.DeleteShard(context.Background(), newKeyspaceName, "-")
+ require.NoError(t, err)
+ // Wait to receive an update from the watch.
+ record = <-ch
+ require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
+ require.EqualValues(t, "-", record.ShardInfo.ShardName())
+ require.Error(t, record.Err)
+
+ // Update the keyspace record and see that we receive an update.
+ func() {
+ ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName)
+ require.NoError(t, err)
+ ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords")
+ require.NoError(t, err)
+ defer unlock(&err)
+ ki.DurabilityPolicy = policy.DurabilityCrossCell
+ err = ts.UpdateKeyspace(ctx, ki)
+ require.NoError(t, err)
+ }()
+
+ // Wait to receive an update from the watch.
+ record = <-ch
+ require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
+ require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy)
+}
+
func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
diff --git a/go/test/endtoend/topotest/zk2/main_test.go b/go/test/endtoend/topotest/zk2/main_test.go
index 95a2fc13894..83b257f1609 100644
--- a/go/test/endtoend/topotest/zk2/main_test.go
+++ b/go/test/endtoend/topotest/zk2/main_test.go
@@ -25,7 +25,9 @@ import (
topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
+ topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
+ "vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/log"
@@ -197,6 +199,107 @@ func TestKeyspaceLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}
+// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method.
+// We test out different updates and see if we receive the correct update
+// from the watch.
+func TestWatchAllKeyspaceRecords(t *testing.T) {
+ // Create the topo server connection.
+ ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldProcess.TopoGlobalAddress, clusterInstance.VtctldProcess.TopoGlobalRoot)
+ require.NoError(t, err)
+
+ watchCtx, watchCancel := context.WithCancel(context.Background())
+ defer watchCancel()
+ initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx)
+ require.NoError(t, err)
+
+ // Check that we have the initial records.
+ // The existing keyspace and shard records should be seen.
+ require.Len(t, initRecords, 2)
+ var ksInfo *topo.KeyspaceInfo
+ var shardInfo *topo.ShardInfo
+ for _, record := range initRecords {
+ if record.KeyspaceInfo != nil {
+ ksInfo = record.KeyspaceInfo
+ }
+ if record.ShardInfo != nil {
+ shardInfo = record.ShardInfo
+ }
+ }
+ require.NotNil(t, ksInfo)
+ require.NotNil(t, shardInfo)
+ require.EqualValues(t, KeyspaceName, ksInfo.KeyspaceName())
+ require.EqualValues(t, KeyspaceName, shardInfo.Keyspace())
+ require.EqualValues(t, "0", shardInfo.ShardName())
+
+ // Create a new keyspace record and see that we receive an update.
+ newKeyspaceName := "ksTest"
+ err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{
+ KeyspaceType: topodatapb.KeyspaceType_NORMAL,
+ DurabilityPolicy: policy.DurabilitySemiSync,
+ })
+ require.NoError(t, err)
+ defer func() {
+ err = ts.DeleteKeyspace(context.Background(), newKeyspaceName)
+ require.NoError(t, err)
+ }()
+
+ // Wait to receive an update from the watch.
+ record := <-ch
+ require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
+ require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy)
+
+ // Creating a shard should also trigger an update.
+ err = ts.CreateShard(context.Background(), newKeyspaceName, "-")
+ require.NoError(t, err)
+ // Wait to receive an update from the watch.
+ record = <-ch
+ require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
+ require.EqualValues(t, "-", record.ShardInfo.ShardName())
+ require.Nil(t, record.ShardInfo.Shard.PrimaryAlias)
+
+ primaryAlias := &topodatapb.TabletAlias{
+ Cell: cell,
+ Uid: 100,
+ }
+ // Updating a shard should also trigger an update.
+ _, err = ts.UpdateShardFields(context.Background(), newKeyspaceName, "-", func(si *topo.ShardInfo) error {
+ si.PrimaryAlias = primaryAlias
+ return nil
+ })
+ require.NoError(t, err)
+ // Wait to receive an update from the watch.
+ record = <-ch
+ require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
+ require.EqualValues(t, "-", record.ShardInfo.ShardName())
+ require.NotNil(t, record.ShardInfo.Shard.PrimaryAlias)
+
+ // Deleting a shard should also trigger an update.
+ err = ts.DeleteShard(context.Background(), newKeyspaceName, "-")
+ require.NoError(t, err)
+ // Wait to receive an update from the watch.
+ record = <-ch
+ require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
+ require.EqualValues(t, "-", record.ShardInfo.ShardName())
+ require.Error(t, record.Err)
+
+ // Update the keyspace record and see that we receive an update.
+ func() {
+ ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName)
+ require.NoError(t, err)
+ ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords")
+ require.NoError(t, err)
+ defer unlock(&err)
+ ki.DurabilityPolicy = policy.DurabilityCrossCell
+ err = ts.UpdateKeyspace(ctx, ki)
+ require.NoError(t, err)
+ }()
+
+ // Wait to receive an update from the watch.
+ record = <-ch
+ require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
+ require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy)
+}
+
func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
diff --git a/go/vt/key/key.go b/go/vt/key/key.go
index 89d956bd433..d951d123f0b 100644
--- a/go/vt/key/key.go
+++ b/go/vt/key/key.go
@@ -95,6 +95,14 @@ func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange {
return &topodatapb.KeyRange{Start: start, End: end}
}
+// NewCompleteKeyRange returns a complete key range.
+func NewCompleteKeyRange() *topodatapb.KeyRange {
+ return &topodatapb.KeyRange{
+ Start: []byte{},
+ End: []byte{},
+ }
+}
+
// KeyRangeAdd adds two adjacent KeyRange values (in any order) into a single value. If the values are not adjacent,
// it returns false.
func KeyRangeAdd(a, b *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) {
diff --git a/go/vt/topo/keyspace.go b/go/vt/topo/keyspace.go
index 710bbee0653..3e2b6e1badf 100755
--- a/go/vt/topo/keyspace.go
+++ b/go/vt/topo/keyspace.go
@@ -51,6 +51,15 @@ type KeyspaceInfo struct {
*topodatapb.Keyspace
}
+// NewKeyspaceInfo creates a new KeyspaceInfo.
+func NewKeyspaceInfo(name string, keyspace *topodatapb.Keyspace, version Version) *KeyspaceInfo {
+ return &KeyspaceInfo{
+ keyspace: name,
+ Keyspace: keyspace,
+ version: version,
+ }
+}
+
// KeyspaceName returns the keyspace name
func (ki *KeyspaceInfo) KeyspaceName() string {
return ki.keyspace
@@ -430,3 +439,121 @@ func (ts *Server) GetShardNames(ctx context.Context, keyspace string) ([]string,
}
return DirEntriesToStringArray(children), err
}
+
+// WatchKeyspacePrefixData wraps the data we receive on the watch recursive channel
+// The WatchAllKeyspaceAndShardRecords API guarantees exactly one of Value or Err will be set.
+type WatchKeyspacePrefixData struct {
+ KeyspaceInfo *KeyspaceInfo
+ ShardInfo *ShardInfo
+ Err error
+}
+
+// WatchAllKeyspaceAndShardRecords will set a watch on the Keyspace prefix.
+// It has the same contract as conn.WatchRecursive, but it also unpacks the
+// contents into a Keyspace object.
+func (ts *Server) WatchAllKeyspaceAndShardRecords(ctx context.Context) ([]*WatchKeyspacePrefixData, <-chan *WatchKeyspacePrefixData, error) {
+ if err := ctx.Err(); err != nil {
+ return nil, nil, err
+ }
+
+ ctx, cancel := context.WithCancel(ctx)
+
+ // Set up a recursive watch on the KeyspacesPath.
+ current, wdChannel, err := ts.globalCell.WatchRecursive(ctx, KeyspacesPath)
+ if err != nil {
+ cancel()
+ return nil, nil, err
+ }
+ // Unpack the initial data.
+ initialRes, err := checkAndUnpackKeyspacePrefixRecord(current...)
+ if err != nil {
+ // Cancel the watch, drain channel.
+ cancel()
+ for range wdChannel {
+ }
+ return nil, nil, vterrors.Wrapf(err, "error unpacking initial Keyspace objects")
+ }
+
+ changes := make(chan *WatchKeyspacePrefixData, 10)
+ // The background routine reads any event from the watch channel,
+ // translates it, and sends it to the caller.
+ // If cancel() is called, the underlying WatchRecursive() code will
+ // send an ErrInterrupted and then close the channel. We'll
+ // just propagate that back to our caller.
+ go func() {
+ defer cancel()
+ defer close(changes)
+
+ for wd := range wdChannel {
+ if wd.Err != nil {
+ if IsErrType(wd.Err, NoNode) {
+ // One of the nodes was deleted.
+ // We have the path and it will be processed like normal.
+ // We make sure to copy the error from this to signal to the receiver
+ // that the node was deleted.
+ } else {
+ // Last error value, we're done.
+ // wdChannel will be closed right after
+ // this, no need to do anything.
+ changes <- &WatchKeyspacePrefixData{Err: wd.Err}
+ return
+ }
+ }
+
+ res, err := checkAndUnpackKeyspacePrefixRecord(wd)
+ if err != nil {
+ cancel()
+ for range wdChannel {
+ }
+ changes <- &WatchKeyspacePrefixData{Err: vterrors.Wrapf(err, "error unpacking object")}
+ return
+ }
+
+ // Each update will only have a single object, if at all.
+ // We get updates for all objects in the prefix, but we only
+ // care about the keyspace objects.
+ if len(res) == 0 {
+ continue
+ }
+ changes <- res[0]
+ }
+ }()
+
+ return initialRes, changes, nil
+}
+
+// checkAndUnpackKeyspacePrefixRecord checks for Keyspace objects and unpacks them.
+func checkAndUnpackKeyspacePrefixRecord(wds ...*WatchDataRecursive) ([]*WatchKeyspacePrefixData, error) {
+ var res []*WatchKeyspacePrefixData
+ for _, wd := range wds {
+ fileDir, fileType := path.Split(wd.Path)
+ // Check the type of file.
+ //path.Join(KeyspacesPath, keyspace, ShardsPath, shard, ShardFile)
+ switch fileType {
+ case KeyspaceFile:
+ // Unpack a keyspace record.
+ ksName := path.Base(fileDir)
+ value := &topodatapb.Keyspace{}
+ if err := value.UnmarshalVT(wd.Contents); err != nil {
+ return nil, err
+ }
+ res = append(res, &WatchKeyspacePrefixData{
+ Err: wd.Err,
+ KeyspaceInfo: NewKeyspaceInfo(ksName, value, wd.Version),
+ })
+ case ShardFile:
+ shardName := path.Base(fileDir)
+ ksName := path.Base(path.Dir(path.Dir(path.Clean(fileDir))))
+ // Unpack a shard record.
+ value := &topodatapb.Shard{}
+ if err := value.UnmarshalVT(wd.Contents); err != nil {
+ return nil, err
+ }
+ res = append(res, &WatchKeyspacePrefixData{
+ Err: wd.Err,
+ ShardInfo: NewShardInfo(ksName, shardName, value, wd.Version),
+ })
+ }
+ }
+ return res, nil
+}
diff --git a/go/vt/topo/keyspace_external_test.go b/go/vt/topo/keyspace_external_test.go
index bfcb2f591a9..9de1afa514b 100644
--- a/go/vt/topo/keyspace_external_test.go
+++ b/go/vt/topo/keyspace_external_test.go
@@ -18,16 +18,20 @@ package topo_test
import (
"context"
+ "errors"
"fmt"
"slices"
+ "sync"
"testing"
"github.com/stretchr/testify/require"
+ "google.golang.org/protobuf/proto"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
+ "vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
)
@@ -219,3 +223,332 @@ func TestServerGetServingShards(t *testing.T) {
})
}
}
+
+// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method.
+// We test out different updates and see if we receive the correct update
+// from the watch.
+func TestWatchAllKeyspaceRecords(t *testing.T) {
+ ksDef := &topodatapb.Keyspace{
+ KeyspaceType: topodatapb.KeyspaceType_NORMAL,
+ DurabilityPolicy: policy.DurabilitySemiSync,
+ SidecarDbName: "_vt",
+ }
+ tests := []struct {
+ name string
+ setupFunc func(t *testing.T, ts *topo.Server)
+ updateFunc func(t *testing.T, ts *topo.Server)
+ wantInitRecords []*topo.WatchKeyspacePrefixData
+ wantChanRecords []*topo.WatchKeyspacePrefixData
+ }{
+ {
+ name: "Update Durability Policy in 1 Keyspace",
+ setupFunc: func(t *testing.T, ts *topo.Server) {
+ err := ts.CreateKeyspace(context.Background(), "ks", ksDef)
+ require.NoError(t, err)
+ },
+ updateFunc: func(t *testing.T, ts *topo.Server) {
+ ki, err := ts.GetKeyspace(context.Background(), "ks")
+ require.NoError(t, err)
+ ki.DurabilityPolicy = policy.DurabilityCrossCell
+ ctx, unlock, err := ts.LockKeyspace(context.Background(), "ks", "test")
+ require.NoError(t, err)
+ defer unlock(&err)
+ err = ts.UpdateKeyspace(ctx, ki)
+ require.NoError(t, err)
+ },
+ wantInitRecords: []*topo.WatchKeyspacePrefixData{
+ {
+ KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef, nil),
+ },
+ },
+ wantChanRecords: []*topo.WatchKeyspacePrefixData{
+ {
+ KeyspaceInfo: topo.NewKeyspaceInfo("ks", &topodatapb.Keyspace{
+ KeyspaceType: topodatapb.KeyspaceType_NORMAL,
+ DurabilityPolicy: policy.DurabilityCrossCell,
+ SidecarDbName: "_vt",
+ }, nil),
+ },
+ },
+ },
+ {
+ name: "New Keyspace Created",
+ setupFunc: func(t *testing.T, ts *topo.Server) {
+ },
+ updateFunc: func(t *testing.T, ts *topo.Server) {
+ err := ts.CreateKeyspace(context.Background(), "ks", ksDef)
+ require.NoError(t, err)
+ },
+ wantInitRecords: []*topo.WatchKeyspacePrefixData{},
+ wantChanRecords: []*topo.WatchKeyspacePrefixData{
+ {
+ KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef, nil),
+ },
+ },
+ },
+ {
+ name: "New Shard Created",
+ setupFunc: func(t *testing.T, ts *topo.Server) {
+ err := ts.CreateKeyspace(context.Background(), "ks", ksDef)
+ require.NoError(t, err)
+ },
+ updateFunc: func(t *testing.T, ts *topo.Server) {
+ err := ts.CreateShard(context.Background(), "ks", "-")
+ require.NoError(t, err)
+ },
+ wantInitRecords: []*topo.WatchKeyspacePrefixData{
+ {
+ KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef, nil),
+ },
+ },
+ wantChanRecords: []*topo.WatchKeyspacePrefixData{
+ {
+ ShardInfo: topo.NewShardInfo("ks", "-", &topodatapb.Shard{
+ KeyRange: &topodatapb.KeyRange{},
+ IsPrimaryServing: true,
+ }, nil),
+ },
+ },
+ },
+ {
+ name: "Keyspace Deleted",
+ setupFunc: func(t *testing.T, ts *topo.Server) {
+ err := ts.CreateKeyspace(context.Background(), "ks", ksDef)
+ require.NoError(t, err)
+ },
+ updateFunc: func(t *testing.T, ts *topo.Server) {
+ err := ts.DeleteKeyspace(context.Background(), "ks")
+ require.NoError(t, err)
+ },
+ wantInitRecords: []*topo.WatchKeyspacePrefixData{
+ {
+ KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef, nil),
+ },
+ },
+ wantChanRecords: []*topo.WatchKeyspacePrefixData{
+ {
+ Err: topo.NewError(topo.NoNode, "keyspaces/ks/Keyspace"),
+ KeyspaceInfo: topo.NewKeyspaceInfo("ks", &topodatapb.Keyspace{}, nil),
+ },
+ },
+ },
+ {
+ name: "Update KeyspaceType in 1 Keyspace",
+ setupFunc: func(t *testing.T, ts *topo.Server) {
+ err := ts.CreateKeyspace(context.Background(), "ks", ksDef)
+ require.NoError(t, err)
+ },
+ updateFunc: func(t *testing.T, ts *topo.Server) {
+ ki, err := ts.GetKeyspace(context.Background(), "ks")
+ require.NoError(t, err)
+ ki.KeyspaceType = topodatapb.KeyspaceType_SNAPSHOT
+ ctx, unlock, err := ts.LockKeyspace(context.Background(), "ks", "test")
+ require.NoError(t, err)
+ defer unlock(&err)
+ err = ts.UpdateKeyspace(ctx, ki)
+ require.NoError(t, err)
+ },
+ wantInitRecords: []*topo.WatchKeyspacePrefixData{
+ {
+ KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef, nil),
+ },
+ },
+ wantChanRecords: []*topo.WatchKeyspacePrefixData{
+ {
+ KeyspaceInfo: topo.NewKeyspaceInfo("ks", &topodatapb.Keyspace{
+ KeyspaceType: topodatapb.KeyspaceType_SNAPSHOT,
+ DurabilityPolicy: policy.DurabilitySemiSync,
+ SidecarDbName: "_vt",
+ }, nil),
+ },
+ },
+ },
+ {
+ name: "Multiple updates in multiple keyspaces",
+ setupFunc: func(t *testing.T, ts *topo.Server) {
+ err := ts.CreateKeyspace(context.Background(), "ks", ksDef)
+ require.NoError(t, err)
+ err = ts.CreateKeyspace(context.Background(), "ks2", ksDef)
+ require.NoError(t, err)
+ err = ts.CreateShard(context.Background(), "ks2", "-")
+ require.NoError(t, err)
+ },
+ updateFunc: func(t *testing.T, ts *topo.Server) {
+ func() {
+ ki, err := ts.GetKeyspace(context.Background(), "ks")
+ require.NoError(t, err)
+ ki.KeyspaceType = topodatapb.KeyspaceType_SNAPSHOT
+ ctx, unlock, err := ts.LockKeyspace(context.Background(), "ks", "test")
+ require.NoError(t, err)
+ defer unlock(&err)
+ err = ts.UpdateKeyspace(ctx, ki)
+ require.NoError(t, err)
+ }()
+ func() {
+ ki, err := ts.GetKeyspace(context.Background(), "ks2")
+ require.NoError(t, err)
+ ki.DurabilityPolicy = policy.DurabilityCrossCell
+ ctx, unlock, err := ts.LockKeyspace(context.Background(), "ks2", "test")
+ require.NoError(t, err)
+ defer unlock(&err)
+ err = ts.UpdateKeyspace(ctx, ki)
+ require.NoError(t, err)
+ }()
+ func() {
+ _, err := ts.UpdateShardFields(context.Background(), "ks2", "-", func(info *topo.ShardInfo) error {
+ info.IsPrimaryServing = false
+ return nil
+ })
+ require.NoError(t, err)
+ }()
+ },
+ wantInitRecords: []*topo.WatchKeyspacePrefixData{
+ {
+ KeyspaceInfo: topo.NewKeyspaceInfo("ks", ksDef, nil),
+ },
+ {
+ KeyspaceInfo: topo.NewKeyspaceInfo("ks2", ksDef, nil),
+ },
+ {
+ ShardInfo: topo.NewShardInfo("ks2", "-", &topodatapb.Shard{
+ KeyRange: &topodatapb.KeyRange{},
+ IsPrimaryServing: true,
+ }, nil),
+ },
+ },
+ wantChanRecords: []*topo.WatchKeyspacePrefixData{
+ {
+ KeyspaceInfo: topo.NewKeyspaceInfo("ks", &topodatapb.Keyspace{
+ KeyspaceType: topodatapb.KeyspaceType_SNAPSHOT,
+ DurabilityPolicy: policy.DurabilitySemiSync,
+ SidecarDbName: "_vt",
+ }, nil),
+ },
+ {
+ KeyspaceInfo: topo.NewKeyspaceInfo("ks2", &topodatapb.Keyspace{
+ KeyspaceType: topodatapb.KeyspaceType_NORMAL,
+ DurabilityPolicy: policy.DurabilityCrossCell,
+ SidecarDbName: "_vt",
+ }, nil),
+ },
+ {
+ ShardInfo: topo.NewShardInfo("ks2", "-", &topodatapb.Shard{
+ KeyRange: &topodatapb.KeyRange{},
+ IsPrimaryServing: false,
+ }, nil),
+ },
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ // Create a memory topo server for the test.
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ ts := memorytopo.NewServer(ctx)
+ defer ts.Close()
+
+ // Do the initial setup before starting the watch.
+ tt.setupFunc(t, ts)
+
+ // Start the watch and verify the initial records received.
+ watchCtx, watchCancel := context.WithCancel(ctx)
+ defer watchCancel()
+ initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx)
+ require.NoError(t, err)
+ elementsMatchFunc(t, tt.wantInitRecords, initRecords, watchKeyspacePrefixDataMatches)
+
+ // We start a go routine to collect all the records from the channel.
+ wg := sync.WaitGroup{}
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ idx := 0
+ for data := range ch {
+ if topo.IsErrType(data.Err, topo.Interrupted) {
+ continue
+ }
+ require.GreaterOrEqual(t, len(tt.wantChanRecords), idx+1)
+ require.True(t, watchKeyspacePrefixDataMatches(tt.wantChanRecords[idx], data))
+ idx++
+ // Stop the watch after we have verified we have received all required updates.
+ if idx == len(tt.wantChanRecords) {
+ watchCancel()
+ }
+ }
+ }()
+
+ // Update the records and verify the records received on the channel.
+ tt.updateFunc(t, ts)
+ if len(tt.wantChanRecords) == 0 {
+ // If there are no records to verify, we can stop the watch.
+ watchCancel()
+ }
+ // Wait for the go routine to finish.
+ wg.Wait()
+ })
+ }
+}
+
+// elementsMatchFunc checks if two slices have the same elements (ignoring order),
+// using a custom equality function to compare elements.
+func elementsMatchFunc[T any](t *testing.T, expected, actual []T, equalFn func(a, b T) bool) {
+ require.Len(t, actual, len(expected))
+
+ visited := make([]bool, len(actual))
+ for _, exp := range expected {
+ found := false
+ for i, act := range actual {
+ if visited[i] {
+ continue
+ }
+ if equalFn(exp, act) {
+ visited[i] = true
+ found = true
+ break
+ }
+ }
+ require.True(t, found, "Expected element %+v not found in actual slice", exp)
+ }
+}
+
+// watchKeyspacePrefixDataMatches is a helper function to check equality of two topo.WatchKeyspacePrefixData.
+func watchKeyspacePrefixDataMatches(a, b *topo.WatchKeyspacePrefixData) bool {
+ if a == nil || b == nil {
+ return a == b
+ }
+ if !errors.Is(a.Err, b.Err) {
+ return false
+ }
+ if a.KeyspaceInfo == nil || b.KeyspaceInfo == nil {
+ if a.KeyspaceInfo != b.KeyspaceInfo {
+ return false
+ }
+ } else {
+ if !proto.Equal(a.KeyspaceInfo.Keyspace, b.KeyspaceInfo.Keyspace) {
+ return false
+ }
+ if a.KeyspaceInfo.KeyspaceName() != b.KeyspaceInfo.KeyspaceName() {
+ return false
+ }
+ }
+
+ if a.ShardInfo == nil || b.ShardInfo == nil {
+ if a.ShardInfo != b.ShardInfo {
+ return false
+ }
+ } else {
+ if !proto.Equal(a.ShardInfo.Shard, b.ShardInfo.Shard) {
+ return false
+ }
+ if a.ShardInfo.Keyspace() != b.ShardInfo.Keyspace() {
+ return false
+ }
+ if a.ShardInfo.ShardName() != b.ShardInfo.ShardName() {
+ return false
+ }
+ }
+
+ return true
+}
diff --git a/go/vt/topo/memorytopo/memorytopo.go b/go/vt/topo/memorytopo/memorytopo.go
index 9d703a2869a..2f1bfed34fb 100644
--- a/go/vt/topo/memorytopo/memorytopo.go
+++ b/go/vt/topo/memorytopo/memorytopo.go
@@ -23,6 +23,7 @@ import (
"context"
"errors"
"math/rand/v2"
+ "path"
"regexp"
"strings"
"sync"
@@ -218,6 +219,13 @@ func (n *node) isDirectory() bool {
return n.children != nil
}
+func (n *node) getFullPath() string {
+ if n.parent == nil {
+ return n.name
+ }
+ return path.Join(n.parent.getFullPath(), n.name)
+}
+
func (n *node) recurseContents(callback func(n *node)) {
if n.isDirectory() {
for _, child := range n.children {
diff --git a/go/vt/topo/memorytopo/watch.go b/go/vt/topo/memorytopo/watch.go
index dcb90a8f0ef..a8a80f20a0f 100644
--- a/go/vt/topo/memorytopo/watch.go
+++ b/go/vt/topo/memorytopo/watch.go
@@ -104,7 +104,7 @@ func (c *Conn) WatchRecursive(ctx context.Context, dirpath string) ([]*topo.Watc
var initialwd []*topo.WatchDataRecursive
n.recurseContents(func(n *node) {
initialwd = append(initialwd, &topo.WatchDataRecursive{
- Path: n.name,
+ Path: n.getFullPath(),
WatchData: topo.WatchData{
Contents: n.contents,
Version: NodeVersion(n.version),
diff --git a/go/vt/topo/shard_test.go b/go/vt/topo/shard_test.go
index 6bd4aae5b62..915bcd18e3c 100644
--- a/go/vt/topo/shard_test.go
+++ b/go/vt/topo/shard_test.go
@@ -323,6 +323,14 @@ func TestValidateShardName(t *testing.T) {
},
valid: true,
},
+ {
+ name: "-",
+ expectedRange: &topodatapb.KeyRange{
+ Start: []byte{},
+ End: []byte{},
+ },
+ valid: true,
+ },
{
name: "40-80",
expectedRange: &topodatapb.KeyRange{
diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go
index 0dd17cb65fd..5beb3c8eb3b 100644
--- a/go/vt/vtorc/logic/keyspace_shard_discovery.go
+++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go
@@ -18,9 +18,7 @@ package logic
import (
"context"
- "sort"
- "strings"
- "sync"
+ "time"
"vitess.io/vitess/go/vt/log"
@@ -28,62 +26,74 @@ import (
"vitess.io/vitess/go/vt/vtorc/inst"
)
-// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
-func RefreshAllKeyspacesAndShards(ctx context.Context) error {
- var keyspaces []string
- if len(clustersToWatch) == 0 { // all known keyspaces
- ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
- defer cancel()
- var err error
- // Get all the keyspaces
- keyspaces, err = ts.GetKeyspaces(ctx)
- if err != nil {
- return err
- }
- } else {
- // Parse input and build list of keyspaces
- for _, ks := range clustersToWatch {
- if strings.Contains(ks, "/") {
- // This is a keyspace/shard specification
- input := strings.Split(ks, "/")
- keyspaces = append(keyspaces, input[0])
- } else {
- // Assume this is a keyspace
- keyspaces = append(keyspaces, ks)
+// setupKeyspaceAndShardRecordsWatch sets up a watch on all keyspace and shard records.
+func setupKeyspaceAndShardRecordsWatch(ctx context.Context, ts *topo.Server) {
+ go func() {
+ for {
+ if ctx.Err() != nil {
+ return
+ }
+ initialRecs, updateChan, err := ts.WatchAllKeyspaceAndShardRecords(ctx)
+ if err != nil {
+ if ctx.Err() != nil {
+ return
+ }
+ // Back for a while and then try setting up the watch again.
+ time.Sleep(10 * time.Second)
+ continue
+ }
+ for _, rec := range initialRecs {
+ err = processKeyspacePrefixWatchUpdate(rec)
+ if err != nil {
+ log.Errorf("failed to process initial keyspace/shard record: %+v", err)
+ break
+ }
+ }
+ if err != nil {
+ continue
+ }
+
+ for data := range updateChan {
+ err = processKeyspacePrefixWatchUpdate(data)
+ if err != nil {
+ log.Errorf("failed to process keyspace/shard record update: %+v", err)
+ break
+ }
}
}
- if len(keyspaces) == 0 {
- log.Errorf("Found no keyspaces for input: %+v", clustersToWatch)
- return nil
- }
+ }()
+}
+
+// processKeyspacePrefixWatchUpdate processes a keyspace prefix watch update.
+func processKeyspacePrefixWatchUpdate(wd *topo.WatchKeyspacePrefixData) error {
+ // We ignore the error in the watch data.
+ // If there is an error that closes the watch, then
+ // we will open it again.
+ if wd.Err != nil {
+ return nil
}
+ if wd.KeyspaceInfo != nil {
+ return processKeyspaceUpdate(wd)
+ } else if wd.ShardInfo != nil {
+ return processShardUpdate(wd)
+ }
+ return wd.Err
+}
- // Sort the list of keyspaces.
- // The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace
- sort.Strings(keyspaces)
- refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
- defer refreshCancel()
- var wg sync.WaitGroup
- for idx, keyspace := range keyspaces {
- // Check if the current keyspace name is the same as the last one.
- // If it is, then we know we have already refreshed its information.
- // We do not need to do it again.
- if idx != 0 && keyspace == keyspaces[idx-1] {
- continue
- }
- wg.Add(2)
- go func(keyspace string) {
- defer wg.Done()
- _ = refreshKeyspaceHelper(refreshCtx, keyspace)
- }(keyspace)
- go func(keyspace string) {
- defer wg.Done()
- _ = refreshAllShards(refreshCtx, keyspace)
- }(keyspace)
+// processShardUpdate processes a shard update.
+func processShardUpdate(wd *topo.WatchKeyspacePrefixData) error {
+ if !shardPartOfWatch(wd.ShardInfo.Keyspace(), wd.ShardInfo.GetKeyRange()) {
+ return nil
}
- wg.Wait()
+ return inst.SaveShard(wd.ShardInfo)
+}
- return nil
+// processKeyspaceUpdate processes a keyspace update.
+func processKeyspaceUpdate(wd *topo.WatchKeyspacePrefixData) error {
+ if !keyspacePartOfWatch(wd.KeyspaceInfo.KeyspaceName()) {
+ return nil
+ }
+ return inst.SaveKeyspace(wd.KeyspaceInfo)
}
// RefreshKeyspaceAndShard refreshes the keyspace record and shard record for the given keyspace and shard.
@@ -123,28 +133,6 @@ func refreshKeyspaceHelper(ctx context.Context, keyspaceName string) error {
return err
}
-// refreshAllShards refreshes all the shard records in the given keyspace.
-func refreshAllShards(ctx context.Context, keyspaceName string) error {
- shardInfos, err := ts.FindAllShardsInKeyspace(ctx, keyspaceName, &topo.FindAllShardsInKeyspaceOptions{
- // Fetch shard records concurrently to speed up discovery. A typical
- // Vitess cluster will have 1-3 vtorc instances deployed, so there is
- // little risk of a thundering herd.
- Concurrency: 8,
- })
- if err != nil {
- log.Error(err)
- return err
- }
- for _, shardInfo := range shardInfos {
- err = inst.SaveShard(shardInfo)
- if err != nil {
- log.Error(err)
- return err
- }
- }
- return nil
-}
-
// refreshSingleShardHelper is a helper function that refreshes the shard record of the given keyspace/shard.
func refreshSingleShardHelper(ctx context.Context, keyspaceName string, shardName string) error {
shardInfo, err := ts.GetShard(ctx, keyspaceName, shardName)
diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go
index 8218af45db6..a1a80b5138c 100644
--- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go
+++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go
@@ -20,10 +20,12 @@ import (
"context"
"fmt"
"testing"
+ "time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
+ "vitess.io/vitess/go/vt/external/golib/sqlutils"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/memorytopo"
@@ -52,7 +54,86 @@ var (
}
)
-func TestRefreshAllKeyspaces(t *testing.T) {
+// TestSetupKeyspaceAndShardRecordsWatch tests that the watch is setup correctly for keyspace and shard records.
+func TestSetupKeyspaceAndShardRecordsWatch(t *testing.T) {
+ // Store the old flags and restore on test completion
+ oldTs := ts
+ oldClustersToWatch := clustersToWatch
+ defer func() {
+ ts = oldTs
+ clustersToWatch = oldClustersToWatch
+ }()
+
+ db.ClearVTOrcDatabase()
+ defer func() {
+ db.ClearVTOrcDatabase()
+ }()
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ ts = memorytopo.NewServer(ctx, "zone1")
+
+ for _, ks := range []string{"ks1", "ks2"} {
+ err := ts.CreateKeyspace(ctx, ks, keyspaceDurabilityNone)
+ require.NoError(t, err)
+ for idx, sh := range []string{"-80", "80-"} {
+ err = ts.CreateShard(ctx, ks, sh)
+ require.NoError(t, err)
+ _, err = ts.UpdateShardFields(ctx, ks, sh, func(si *topo.ShardInfo) error {
+ si.PrimaryAlias = &topodatapb.TabletAlias{
+ Cell: fmt.Sprintf("zone_%v", ks),
+ Uid: uint32(100 + idx),
+ }
+ return nil
+ })
+ require.NoError(t, err)
+ }
+ }
+
+ // Set up the keyspace and shard watch.
+ setupKeyspaceAndShardRecordsWatch(ctx, ts)
+ waitForKeyspaceCount(t, 2)
+ // Verify that we only have ks1 and ks2 in vtorc's db.
+ verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "")
+ verifyPrimaryAlias(t, "ks1", "-80", "zone_ks1-0000000100", "")
+ verifyKeyspaceInfo(t, "ks2", keyspaceDurabilityNone, "")
+ verifyPrimaryAlias(t, "ks2", "80-", "zone_ks2-0000000101", "")
+ verifyKeyspaceInfo(t, "ks3", nil, "keyspace not found")
+ verifyPrimaryAlias(t, "ks3", "80-", "", "shard not found")
+ verifyKeyspaceInfo(t, "ks4", nil, "keyspace not found")
+
+ // Update primary on the shard.
+ _, err := ts.UpdateShardFields(ctx, "ks1", "-80", func(si *topo.ShardInfo) error {
+ si.PrimaryAlias.Cell = "updated_new_cell"
+ return nil
+ })
+ require.NoError(t, err)
+
+ // Delete a shard.
+ // We will verify that we don't delete a shard info in VTOrc.
+ // We ignore delete updates for now.
+ err = ts.DeleteShard(ctx, "ks2", "80-")
+ require.NoError(t, err)
+
+ // Create a new keyspace record.
+ err = ts.CreateKeyspace(ctx, "ks3", keyspaceDurabilitySemiSync)
+ require.NoError(t, err)
+
+ // Check that the watch sees these updates.
+ waitForKeyspaceCount(t, 3)
+ // Verify that we only have ks1 and ks2 in vtorc's db.
+ verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "")
+ verifyPrimaryAlias(t, "ks1", "-80", "updated_new_cell-0000000100", "")
+ verifyKeyspaceInfo(t, "ks2", keyspaceDurabilityNone, "")
+ verifyPrimaryAlias(t, "ks2", "80-", "zone_ks2-0000000101", "")
+ verifyKeyspaceInfo(t, "ks3", keyspaceDurabilitySemiSync, "")
+ verifyPrimaryAlias(t, "ks3", "80-", "", "shard not found")
+ verifyKeyspaceInfo(t, "ks4", nil, "keyspace not found")
+}
+
+// TestInitialSetupOfWatch tests that the initial setup of the watch for shards
+// and keyspaces loads the latest information from the topo server.
+func TestInitialSetupOfWatch(t *testing.T) {
// Store the old flags and restore on test completion
oldTs := ts
oldClustersToWatch := clustersToWatch
@@ -93,7 +174,11 @@ func TestRefreshAllKeyspaces(t *testing.T) {
// Set clusters to watch to only watch ks1 and ks3
onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"}
clustersToWatch = onlyKs1and3
- require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))
+ initializeShardsToWatch()
+ watchCtx, watchCancel := context.WithCancel(context.Background())
+ setupKeyspaceAndShardRecordsWatch(watchCtx, ts)
+ waitForKeyspaceCount(t, 2)
+ watchCancel()
// Verify that we only have ks1 and ks3 in vtorc's db.
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilityNone, "")
@@ -106,9 +191,13 @@ func TestRefreshAllKeyspaces(t *testing.T) {
// Set clusters to watch to watch all keyspaces
clustersToWatch = nil
+ initializeShardsToWatch()
// Change the durability policy of ks1
reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", policy.DurabilitySemiSync)
- require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))
+ watchCtx, watchCancel = context.WithCancel(context.Background())
+ setupKeyspaceAndShardRecordsWatch(watchCtx, ts)
+ waitForKeyspaceCount(t, 4)
+ watchCancel()
// Verify that all the keyspaces are correctly reloaded
verifyKeyspaceInfo(t, "ks1", keyspaceDurabilitySemiSync, "")
@@ -119,7 +208,30 @@ func TestRefreshAllKeyspaces(t *testing.T) {
verifyPrimaryAlias(t, "ks3", "80-", "zone_ks3-0000000101", "")
verifyKeyspaceInfo(t, "ks4", keyspaceDurabilityTest, "")
verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "")
+}
+// waitForKeyspaceCount waits for the keyspace count to match the expected value.
+func waitForKeyspaceCount(t *testing.T, count int) {
+ t.Helper()
+ timeout := time.After(10 * time.Second)
+ for {
+ select {
+ case <-timeout:
+ t.Errorf("timed out waiting for keyspace count")
+ return
+ default:
+ }
+ var curCount = 0
+ err := db.QueryVTOrcRowsMap("select count(*) as c from vitess_keyspace", func(row sqlutils.RowMap) error {
+ curCount = row.GetInt("c")
+ return nil
+ })
+ require.NoError(t, err)
+ if curCount == count {
+ return
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
}
func TestRefreshKeyspace(t *testing.T) {
diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go
index eb10bb2a667..b06177d1b1a 100644
--- a/go/vt/vtorc/logic/tablet_discovery.go
+++ b/go/vt/vtorc/logic/tablet_discovery.go
@@ -32,11 +32,11 @@ import (
"google.golang.org/protobuf/proto"
"vitess.io/vitess/go/vt/external/golib/sqlutils"
+ "vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
- "vitess.io/vitess/go/vt/vtorc/config"
"vitess.io/vitess/go/vt/vtorc/db"
"vitess.io/vitess/go/vt/vtorc/inst"
"vitess.io/vitess/go/vt/vttablet/tmclient"
@@ -48,8 +48,10 @@ var (
clustersToWatch []string
shutdownWaitTime = 30 * time.Second
shardsLockCounter int32
- shardsToWatch map[string][]string
- shardsToWatchMu sync.Mutex
+ // shardsToWatch is a map storing the shards for a given keyspace that need to be watched.
+ // We store the key range for all the shards that we want to watch.
+ // This is populated by parsing `--clusters_to_watch` flag.
+ shardsToWatch map[string][]*topodatapb.KeyRange
// ErrNoPrimaryTablet is a fixed error message.
ErrNoPrimaryTablet = errors.New("no primary tablet found")
@@ -57,18 +59,18 @@ var (
// RegisterFlags registers the flags required by VTOrc
func RegisterFlags(fs *pflag.FlagSet) {
- fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
+ fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/key-ranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM")
}
-// updateShardsToWatch parses the --clusters_to_watch flag-value
+// initializeShardsToWatch parses the --clusters_to_watch flag-value
// into a map of keyspace/shards.
-func updateShardsToWatch() {
+func initializeShardsToWatch() {
+ shardsToWatch = make(map[string][]*topodatapb.KeyRange)
if len(clustersToWatch) == 0 {
return
}
- newShardsToWatch := make(map[string][]string, 0)
for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") && !strings.HasSuffix(ks, "/") {
// Validate keyspace/shard parses.
@@ -77,39 +79,65 @@ func updateShardsToWatch() {
log.Errorf("Could not parse keyspace/shard %q: %+v", ks, err)
continue
}
- newShardsToWatch[k] = append(newShardsToWatch[k], s)
- } else {
- ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
- defer cancel()
- // Assume this is a keyspace and find all shards in keyspace.
- // Remove trailing slash if exists.
- ks = strings.TrimSuffix(ks, "/")
- shards, err := ts.GetShardNames(ctx, ks)
+ // Parse the shard name into key range value.
+ _, keyRange, err := topo.ValidateShardName(s)
if err != nil {
- // Log the err and continue.
- log.Errorf("Error fetching shards for keyspace: %v", ks)
- continue
+ log.Errorf("Could not parse shard name %q: %+v", s, err)
}
- if len(shards) == 0 {
- log.Errorf("Topo has no shards for ks: %v", ks)
- continue
+ // If the key range is nil, then the user is not using RangeBased Sharding.
+ // So we want to watch all the shards of the keyspace.
+ if keyRange == nil {
+ keyRange = key.NewCompleteKeyRange()
}
- newShardsToWatch[ks] = shards
+ shardsToWatch[k] = append(shardsToWatch[k], keyRange)
+ } else {
+ // Remove trailing slash if exists.
+ ks = strings.TrimSuffix(ks, "/")
+ // We store the entire range of key range if nothing is specified.
+ shardsToWatch[ks] = []*topodatapb.KeyRange{key.NewCompleteKeyRange()}
}
}
- if len(newShardsToWatch) == 0 {
- log.Error("No keyspace/shards to watch")
- return
+
+ if len(shardsToWatch) == 0 {
+ log.Error("No keyspace/shards to watch, watching all keyspaces")
}
+}
- shardsToWatchMu.Lock()
- defer shardsToWatchMu.Unlock()
- shardsToWatch = newShardsToWatch
+// keyspacePartOfWatch checks if the given keyspace is part of the watch list.
+func keyspacePartOfWatch(keyspace string) bool {
+ // If we are watching all keyspaces, then we want to watch this keyspace too.
+ if len(shardsToWatch) == 0 {
+ return true
+ }
+ _, shouldWatch := shardsToWatch[keyspace]
+ return shouldWatch
+}
+
+// shardPartOfWatch checks if the given tablet is part of the watch list.
+func shardPartOfWatch(keyspace string, keyRange *topodatapb.KeyRange) bool {
+ // If we are watching all keyspaces, then we want to watch this tablet too.
+ if len(shardsToWatch) == 0 {
+ return true
+ }
+ shardRanges, ok := shardsToWatch[keyspace]
+ // If we don't have the keyspace in our map, then this tablet
+ // doesn't need to be watched.
+ if !ok {
+ return false
+ }
+
+ // Check if the key range is part of the shard ranges we are watching.
+ for _, shardRange := range shardRanges {
+ if key.KeyRangeContainsKeyRange(shardRange, keyRange) {
+ return true
+ }
+ }
+ return false
}
-// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
+// OpenDiscoveryFromTopo opens the vitess topo if enables and returns a ticker
// channel for polling.
-func OpenTabletDiscovery() <-chan time.Time {
+func OpenDiscoveryFromTopo() {
ts = topo.Open()
tmc = inst.InitializeTMC()
// Clear existing cache and perform a new refresh.
@@ -117,15 +145,15 @@ func OpenTabletDiscovery() <-chan time.Time {
log.Error(err)
}
// Parse --clusters_to_watch into a filter.
- updateShardsToWatch()
+ initializeShardsToWatch()
+ setupKeyspaceAndShardRecordsWatch(context.Background(), ts)
// We refresh all information from the topo once before we start the ticks to do
// it on a timer.
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
- if err := refreshAllInformation(ctx); err != nil {
+ if err := refreshTopoTick(ctx); err != nil {
log.Errorf("failed to initialize topo information: %+v", err)
}
- return time.Tick(config.GetTopoInformationRefreshDuration()) //nolint SA1015: using time.Tick leaks the underlying ticker
}
// getAllTablets gets all tablets from all cells using a goroutine per cell.
@@ -179,16 +207,10 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f
// Filter tablets that should not be watched using shardsToWatch map.
matchedTablets := make([]*topo.TabletInfo, 0, len(tablets))
func() {
- shardsToWatchMu.Lock()
- defer shardsToWatchMu.Unlock()
for _, t := range tablets {
- if len(shardsToWatch) > 0 {
- _, ok := shardsToWatch[t.Tablet.Keyspace]
- if !ok || !slices.Contains(shardsToWatch[t.Tablet.Keyspace], t.Tablet.Shard) {
- continue // filter
- }
+ if shardPartOfWatch(t.Tablet.GetKeyspace(), t.Tablet.GetKeyRange()) {
+ matchedTablets = append(matchedTablets, t)
}
- matchedTablets = append(matchedTablets, t)
}
}()
diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go
index 54284e8a017..90cd684e125 100644
--- a/go/vt/vtorc/logic/tablet_discovery_test.go
+++ b/go/vt/vtorc/logic/tablet_discovery_test.go
@@ -30,6 +30,7 @@ import (
"google.golang.org/protobuf/proto"
"vitess.io/vitess/go/vt/external/golib/sqlutils"
+ "vitess.io/vitess/go/vt/key"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/topo"
@@ -102,60 +103,209 @@ var (
}
)
-func TestUpdateShardsToWatch(t *testing.T) {
+func TestPartOfWatch(t *testing.T) {
oldClustersToWatch := clustersToWatch
- oldTs := ts
defer func() {
clustersToWatch = oldClustersToWatch
shardsToWatch = nil
- ts = oldTs
}()
- // Create a memory topo-server and create the keyspace and shard records
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
+ testCases := []struct {
+ in []string
+ tablet *topodatapb.Tablet
+ expectedShardPartOfWatch bool
+ expectedKsPartOfWatch bool
+ }{
+ {
+ in: []string{},
+ tablet: &topodatapb.Tablet{
+ Keyspace: keyspace,
+ Shard: shard,
+ },
+ expectedShardPartOfWatch: true,
+ expectedKsPartOfWatch: true,
+ },
+ {
+ in: []string{keyspace},
+ tablet: &topodatapb.Tablet{
+ Keyspace: keyspace,
+ Shard: shard,
+ },
+ expectedShardPartOfWatch: true,
+ expectedKsPartOfWatch: true,
+ },
+ {
+ in: []string{keyspace + "/-"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: keyspace,
+ Shard: shard,
+ },
+ expectedShardPartOfWatch: true,
+ expectedKsPartOfWatch: true,
+ },
+ {
+ in: []string{keyspace + "/" + shard},
+ tablet: &topodatapb.Tablet{
+ Keyspace: keyspace,
+ Shard: shard,
+ },
+ expectedShardPartOfWatch: true,
+ expectedKsPartOfWatch: true,
+ },
+ {
+ in: []string{"ks/-70", "ks/70-"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x50}, []byte{0x70}),
+ },
+ expectedShardPartOfWatch: true,
+ expectedKsPartOfWatch: true,
+ },
+ {
+ in: []string{"ks/-70", "ks/70-"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x40}, []byte{0x50}),
+ },
+ expectedShardPartOfWatch: true,
+ expectedKsPartOfWatch: true,
+ },
+ {
+ in: []string{"ks/-70", "ks/70-"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x70}, []byte{0x90}),
+ },
+ expectedShardPartOfWatch: true,
+ expectedKsPartOfWatch: true,
+ },
+ {
+ in: []string{"ks/-70", "ks/70-"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x60}, []byte{0x90}),
+ },
+ expectedShardPartOfWatch: false,
+ expectedKsPartOfWatch: true,
+ },
+ {
+ in: []string{"ks/50-70"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x50}, []byte{0x70}),
+ },
+ expectedShardPartOfWatch: true,
+ expectedKsPartOfWatch: true,
+ },
+ {
+ in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x60}, []byte{0x80}),
+ },
+ expectedShardPartOfWatch: true,
+ expectedKsPartOfWatch: true,
+ },
+ {
+ in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x80}, []byte{0x90}),
+ },
+ expectedShardPartOfWatch: false,
+ expectedKsPartOfWatch: true,
+ },
+ {
+ in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "ks",
+ KeyRange: key.NewKeyRange([]byte{0x90}, []byte{0xa0}),
+ },
+ expectedShardPartOfWatch: false,
+ expectedKsPartOfWatch: true,
+ },
+ {
+ in: []string{"ks2/-70", "ks2/70-", "ks/-80"},
+ tablet: &topodatapb.Tablet{
+ Keyspace: "unknownKs",
+ KeyRange: key.NewKeyRange([]byte{0x90}, []byte{0xa0}),
+ },
+ expectedShardPartOfWatch: false,
+ expectedKsPartOfWatch: false,
+ },
+ }
- ts = memorytopo.NewServer(ctx, cell1)
- _, err := ts.GetOrCreateShard(context.Background(), keyspace, shard)
- require.NoError(t, err)
+ for _, tt := range testCases {
+ t.Run(fmt.Sprintf("%v-Tablet-%v-%v", strings.Join(tt.in, ","), tt.tablet.GetKeyspace(), tt.tablet.GetShard()), func(t *testing.T) {
+ clustersToWatch = tt.in
+ initializeShardsToWatch()
+ assert.Equal(t, tt.expectedShardPartOfWatch, shardPartOfWatch(tt.tablet.GetKeyspace(), tt.tablet.GetKeyRange()))
+ assert.Equal(t, tt.expectedKsPartOfWatch, keyspacePartOfWatch(tt.tablet.GetKeyspace()))
+ })
+ }
+}
+
+// TestInitializeShardsToWatch tests that we initialize the shardsToWatch map correctly
+// using the `--clusters_to_watch` flag.
+func TestInitializeShardsToWatch(t *testing.T) {
+ oldClustersToWatch := clustersToWatch
+ defer func() {
+ clustersToWatch = oldClustersToWatch
+ shardsToWatch = nil
+ }()
testCases := []struct {
in []string
- expected map[string][]string
+ expected map[string][]*topodatapb.KeyRange
}{
{
in: []string{},
- expected: nil,
+ expected: map[string][]*topodatapb.KeyRange{},
},
{
- in: []string{""},
- expected: map[string][]string{},
+ in: []string{"unknownKs"},
+ expected: map[string][]*topodatapb.KeyRange{
+ "unknownKs": {
+ key.NewCompleteKeyRange(),
+ },
+ },
},
{
in: []string{"test/-"},
- expected: map[string][]string{
- "test": {"-"},
+ expected: map[string][]*topodatapb.KeyRange{
+ "test": {
+ key.NewCompleteKeyRange(),
+ },
},
},
{
in: []string{"test/-", "test2/-80", "test2/80-"},
- expected: map[string][]string{
- "test": {"-"},
- "test2": {"-80", "80-"},
+ expected: map[string][]*topodatapb.KeyRange{
+ "test": {
+ key.NewCompleteKeyRange(),
+ },
+ "test2": {
+ key.NewKeyRange([]byte{}, []byte{0x80}),
+ key.NewKeyRange([]byte{0x80}, []byte{}),
+ },
},
},
{
- // confirm shards fetch from topo
+ // known keyspace
in: []string{keyspace},
- expected: map[string][]string{
- keyspace: {shard},
+ expected: map[string][]*topodatapb.KeyRange{
+ keyspace: {
+ key.NewCompleteKeyRange(),
+ },
},
},
{
- // confirm shards fetch from topo when keyspace has trailing-slash
+ // keyspace with trailing-slash
in: []string{keyspace + "/"},
- expected: map[string][]string{
- keyspace: {shard},
+ expected: map[string][]*topodatapb.KeyRange{
+ keyspace: {
+ key.NewCompleteKeyRange(),
+ },
},
},
}
@@ -163,10 +313,10 @@ func TestUpdateShardsToWatch(t *testing.T) {
for _, testCase := range testCases {
t.Run(strings.Join(testCase.in, ","), func(t *testing.T) {
defer func() {
- shardsToWatch = make(map[string][]string, 0)
+ shardsToWatch = make(map[string][]*topodatapb.KeyRange, 0)
}()
clustersToWatch = testCase.in
- updateShardsToWatch()
+ initializeShardsToWatch()
require.Equal(t, testCase.expected, shardsToWatch)
})
}
diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go
index 1fde6e31c0d..5ed1e605015 100644
--- a/go/vt/vtorc/logic/vtorc.go
+++ b/go/vt/vtorc/logic/vtorc.go
@@ -24,7 +24,6 @@ import (
"github.com/patrickmn/go-cache"
"github.com/sjmudd/stopwatch"
- "golang.org/x/sync/errgroup"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
@@ -255,10 +254,11 @@ func ContinuousDiscovery() {
go handleDiscoveryRequests()
+ OpenDiscoveryFromTopo()
healthTick := time.Tick(config.HealthPollSeconds * time.Second)
caretakingTick := time.Tick(time.Minute)
recoveryTick := time.Tick(config.GetRecoveryPollDuration())
- tabletTopoTick := OpenTabletDiscovery()
+ tabletTopoTick := time.Tick(config.GetTopoInformationRefreshDuration())
var recoveryEntrance int64
var snapshotTopologiesTick <-chan time.Time
if config.GetSnapshotTopologyInterval() > 0 {
@@ -308,7 +308,7 @@ func ContinuousDiscovery() {
}()
case <-tabletTopoTick:
ctx, cancel := context.WithTimeout(context.Background(), config.GetTopoInformationRefreshDuration())
- if err := refreshAllInformation(ctx); err != nil {
+ if err := refreshTopoTick(ctx); err != nil {
log.Errorf("failed to refresh topo information: %+v", err)
}
cancel()
@@ -316,29 +316,9 @@ func ContinuousDiscovery() {
}
}
-// refreshAllInformation refreshes both shard and tablet information. This is meant to be run on tablet topo ticks.
-func refreshAllInformation(ctx context.Context) error {
- // Create an errgroup
- eg, ctx := errgroup.WithContext(ctx)
-
- // Refresh all keyspace information.
- eg.Go(func() error {
- return RefreshAllKeyspacesAndShards(ctx)
- })
-
- // Refresh shards to watch.
- eg.Go(func() error {
- updateShardsToWatch()
- return nil
- })
-
- // Refresh all tablets.
- eg.Go(func() error {
- return refreshAllTablets(ctx)
- })
-
- // Wait for both the refreshes to complete
- err := eg.Wait()
+// refreshTopoTick refreshes tablet information from the topo server on a time tick.
+func refreshTopoTick(ctx context.Context) error {
+ err := refreshAllTablets(ctx)
if err == nil {
process.FirstDiscoveryCycleComplete.Store(true)
}
diff --git a/go/vt/vtorc/logic/vtorc_test.go b/go/vt/vtorc/logic/vtorc_test.go
index edd8141e8b7..8bb34f94628 100644
--- a/go/vt/vtorc/logic/vtorc_test.go
+++ b/go/vt/vtorc/logic/vtorc_test.go
@@ -61,7 +61,7 @@ func waitForLocksReleaseAndGetTimeWaitedFor() time.Duration {
return time.Since(start)
}
-func TestRefreshAllInformation(t *testing.T) {
+func TestRefreshTopoTick(t *testing.T) {
// Store the old flags and restore on test completion
oldTs := ts
defer func() {
@@ -85,7 +85,7 @@ func TestRefreshAllInformation(t *testing.T) {
// Test error
ctx, cancel := context.WithCancel(context.Background())
cancel() // cancel context to simulate timeout
- require.Error(t, refreshAllInformation(ctx))
+ require.Error(t, refreshTopoTick(ctx))
require.False(t, process.FirstDiscoveryCycleComplete.Load())
_, discoveredOnce = process.HealthTest()
require.False(t, discoveredOnce)
@@ -93,7 +93,7 @@ func TestRefreshAllInformation(t *testing.T) {
// Test success
ctx2, cancel2 := context.WithCancel(context.Background())
defer cancel2()
- require.NoError(t, refreshAllInformation(ctx2))
+ require.NoError(t, refreshTopoTick(ctx2))
require.True(t, process.FirstDiscoveryCycleComplete.Load())
_, discoveredOnce = process.HealthTest()
require.True(t, discoveredOnce)