Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch Keyspace Records in VTOrc instead of polling them #17553

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/22.0/22.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- **[Stalled Disk Recovery in VTOrc](#stall-disk-recovery)**
- **[Update default MySQL version to 8.0.40](#mysql-8-0-40)**
- **[Update lite images to Debian Bookworm](#debian-bookworm)**
- **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)**
- **[Support for Filtering Query logs on Error](#query-logs)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
Expand Down Expand Up @@ -133,6 +134,11 @@ This is the last time this will be needed in the `8.0.x` series, as starting wit

The base system now uses Debian Bookworm instead of Debian Bullseye for the `vitess/lite` images. This change was brought by [Pull Request #17552].

### <a id="key-range-vtorc"/>KeyRanges in `--clusters_to_watch` in VTOrc</a>
VTOrc now supports specifying KeyRanges in the `--clusters_to_watch` flag. This is useful in scenarios where you don't need to restart a VTOrc instance if you run a reshard.
For example, if a VTOrc is configured to watch `ks/-80`, then it would watch all the shards that fall under the KeyRange `-80`. If a reshard is run and, `-80` is split into new shards `-40`, and `40-80`, the VTOrc instance will automatically start watching the new shard without needing a restart.
The users can still continue to specify exact key ranges too, and the new feature is backward compatible.

### <a id="query-logs"/>Support for Filtering Query logs on Error</a>

The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet.
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Flags:
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/key-ranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored.
--config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn)
--config-name string Name of the config file (without extension) to search for. (default "vtconfig")
Expand Down
103 changes: 103 additions & 0 deletions go/test/endtoend/topotest/consul/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -221,6 +223,107 @@ func TestKeyspaceLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method.
// We test out different updates and see if we receive the correct update
// from the watch.
func TestWatchAllKeyspaceRecords(t *testing.T) {
// Create the topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldProcess.TopoGlobalAddress, clusterInstance.VtctldProcess.TopoGlobalRoot)
require.NoError(t, err)

watchCtx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx)
require.NoError(t, err)

// Check that we have the initial records.
// The existing keyspace and shard records should be seen.
require.Len(t, initRecords, 2)
var ksInfo *topo.KeyspaceInfo
var shardInfo *topo.ShardInfo
for _, record := range initRecords {
if record.KeyspaceInfo != nil {
ksInfo = record.KeyspaceInfo
}
if record.ShardInfo != nil {
shardInfo = record.ShardInfo
}
}
require.NotNil(t, ksInfo)
require.NotNil(t, shardInfo)
require.EqualValues(t, KeyspaceName, ksInfo.KeyspaceName())
require.EqualValues(t, KeyspaceName, shardInfo.Keyspace())
require.EqualValues(t, "0", shardInfo.ShardName())

// Create a new keyspace record and see that we receive an update.
newKeyspaceName := "ksTest"
err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_NORMAL,
DurabilityPolicy: policy.DurabilitySemiSync,
})
require.NoError(t, err)
defer func() {
err = ts.DeleteKeyspace(context.Background(), newKeyspaceName)
require.NoError(t, err)
}()

// Wait to receive an update from the watch.
record := <-ch
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy)

// Creating a shard should also trigger an update.
err = ts.CreateShard(context.Background(), newKeyspaceName, "-")
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.Nil(t, record.ShardInfo.Shard.PrimaryAlias)

primaryAlias := &topodatapb.TabletAlias{
Cell: cell,
Uid: 100,
}
// Updating a shard should also trigger an update.
_, err = ts.UpdateShardFields(context.Background(), newKeyspaceName, "-", func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryAlias
return nil
})
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.NotNil(t, record.ShardInfo.Shard.PrimaryAlias)

// Deleting a shard should also trigger an update.
err = ts.DeleteShard(context.Background(), newKeyspaceName, "-")
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.Error(t, record.Err)

// Update the keyspace record and see that we receive an update.
func() {
ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName)
require.NoError(t, err)
ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords")
require.NoError(t, err)
defer unlock(&err)
ki.DurabilityPolicy = policy.DurabilityCrossCell
err = ts.UpdateKeyspace(ctx, ki)
require.NoError(t, err)
}()

// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy)
}

func execute(t *testing.T, conn *mysql.Conn, query string) *sqltypes.Result {
t.Helper()
qr, err := conn.ExecuteFetch(query, 1000, true)
Expand Down
103 changes: 103 additions & 0 deletions go/test/endtoend/topotest/etcd2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ import (

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"

"vitess.io/vitess/go/vt/log"

Expand Down Expand Up @@ -268,6 +270,107 @@ func TestNamedLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondCallerAcquired, true)
}

// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method.
// We test out different updates and see if we receive the correct update
// from the watch.
func TestWatchAllKeyspaceRecords(t *testing.T) {
// Create the topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldProcess.TopoGlobalAddress, clusterInstance.VtctldProcess.TopoGlobalRoot)
require.NoError(t, err)

watchCtx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx)
require.NoError(t, err)

// Check that we have the initial records.
// The existing keyspace and shard records should be seen.
require.Len(t, initRecords, 2)
var ksInfo *topo.KeyspaceInfo
var shardInfo *topo.ShardInfo
for _, record := range initRecords {
if record.KeyspaceInfo != nil {
ksInfo = record.KeyspaceInfo
}
if record.ShardInfo != nil {
shardInfo = record.ShardInfo
}
}
require.NotNil(t, ksInfo)
require.NotNil(t, shardInfo)
require.EqualValues(t, KeyspaceName, ksInfo.KeyspaceName())
require.EqualValues(t, KeyspaceName, shardInfo.Keyspace())
require.EqualValues(t, "0", shardInfo.ShardName())

// Create a new keyspace record and see that we receive an update.
newKeyspaceName := "ksTest"
err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_NORMAL,
DurabilityPolicy: policy.DurabilitySemiSync,
})
require.NoError(t, err)
defer func() {
err = ts.DeleteKeyspace(context.Background(), newKeyspaceName)
require.NoError(t, err)
}()

// Wait to receive an update from the watch.
record := <-ch
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy)

// Creating a shard should also trigger an update.
err = ts.CreateShard(context.Background(), newKeyspaceName, "-")
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.Nil(t, record.ShardInfo.Shard.PrimaryAlias)

primaryAlias := &topodatapb.TabletAlias{
Cell: cell,
Uid: 100,
}
// Updating a shard should also trigger an update.
_, err = ts.UpdateShardFields(context.Background(), newKeyspaceName, "-", func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryAlias
return nil
})
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.NotNil(t, record.ShardInfo.Shard.PrimaryAlias)

// Deleting a shard should also trigger an update.
err = ts.DeleteShard(context.Background(), newKeyspaceName, "-")
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.Error(t, record.Err)

// Update the keyspace record and see that we receive an update.
func() {
ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName)
require.NoError(t, err)
ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords")
require.NoError(t, err)
defer unlock(&err)
ki.DurabilityPolicy = policy.DurabilityCrossCell
err = ts.UpdateKeyspace(ctx, ki)
require.NoError(t, err)
}()

// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy)
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
Expand Down
103 changes: 103 additions & 0 deletions go/test/endtoend/topotest/zk2/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (

topoutils "vitess.io/vitess/go/test/endtoend/topotest/utils"
"vitess.io/vitess/go/test/endtoend/utils"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"

"vitess.io/vitess/go/vt/log"

Expand Down Expand Up @@ -197,6 +199,107 @@ func TestKeyspaceLocking(t *testing.T) {
topoutils.WaitForBoolValue(t, &secondThreadLockAcquired, true)
}

// TestWatchAllKeyspaceRecords tests the WatchAllKeyspaceAndShardRecords method.
// We test out different updates and see if we receive the correct update
// from the watch.
func TestWatchAllKeyspaceRecords(t *testing.T) {
// Create the topo server connection.
ts, err := topo.OpenServer(*clusterInstance.TopoFlavorString(), clusterInstance.VtctldProcess.TopoGlobalAddress, clusterInstance.VtctldProcess.TopoGlobalRoot)
require.NoError(t, err)

watchCtx, watchCancel := context.WithCancel(context.Background())
defer watchCancel()
initRecords, ch, err := ts.WatchAllKeyspaceAndShardRecords(watchCtx)
require.NoError(t, err)

// Check that we have the initial records.
// The existing keyspace and shard records should be seen.
require.Len(t, initRecords, 2)
var ksInfo *topo.KeyspaceInfo
var shardInfo *topo.ShardInfo
for _, record := range initRecords {
if record.KeyspaceInfo != nil {
ksInfo = record.KeyspaceInfo
}
if record.ShardInfo != nil {
shardInfo = record.ShardInfo
}
}
require.NotNil(t, ksInfo)
require.NotNil(t, shardInfo)
require.EqualValues(t, KeyspaceName, ksInfo.KeyspaceName())
require.EqualValues(t, KeyspaceName, shardInfo.Keyspace())
require.EqualValues(t, "0", shardInfo.ShardName())

// Create a new keyspace record and see that we receive an update.
newKeyspaceName := "ksTest"
err = ts.CreateKeyspace(context.Background(), newKeyspaceName, &topodatapb.Keyspace{
KeyspaceType: topodatapb.KeyspaceType_NORMAL,
DurabilityPolicy: policy.DurabilitySemiSync,
})
require.NoError(t, err)
defer func() {
err = ts.DeleteKeyspace(context.Background(), newKeyspaceName)
require.NoError(t, err)
}()

// Wait to receive an update from the watch.
record := <-ch
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilitySemiSync, record.KeyspaceInfo.Keyspace.DurabilityPolicy)

// Creating a shard should also trigger an update.
err = ts.CreateShard(context.Background(), newKeyspaceName, "-")
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.Nil(t, record.ShardInfo.Shard.PrimaryAlias)

primaryAlias := &topodatapb.TabletAlias{
Cell: cell,
Uid: 100,
}
// Updating a shard should also trigger an update.
_, err = ts.UpdateShardFields(context.Background(), newKeyspaceName, "-", func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryAlias
return nil
})
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.NotNil(t, record.ShardInfo.Shard.PrimaryAlias)

// Deleting a shard should also trigger an update.
err = ts.DeleteShard(context.Background(), newKeyspaceName, "-")
require.NoError(t, err)
// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.ShardInfo.Keyspace())
require.EqualValues(t, "-", record.ShardInfo.ShardName())
require.Error(t, record.Err)

// Update the keyspace record and see that we receive an update.
func() {
ki, err := ts.GetKeyspace(context.Background(), newKeyspaceName)
require.NoError(t, err)
ctx, unlock, err := ts.LockKeyspace(context.Background(), newKeyspaceName, "TestWatchAllKeyspaceRecords")
require.NoError(t, err)
defer unlock(&err)
ki.DurabilityPolicy = policy.DurabilityCrossCell
err = ts.UpdateKeyspace(ctx, ki)
require.NoError(t, err)
}()

// Wait to receive an update from the watch.
record = <-ch
require.EqualValues(t, newKeyspaceName, record.KeyspaceInfo.KeyspaceName())
require.EqualValues(t, policy.DurabilityCrossCell, record.KeyspaceInfo.Keyspace.DurabilityPolicy)
}

func execMulti(t *testing.T, conn *mysql.Conn, query string) []*sqltypes.Result {
t.Helper()
var res []*sqltypes.Result
Expand Down
8 changes: 8 additions & 0 deletions go/vt/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange {
return &topodatapb.KeyRange{Start: start, End: end}
}

// NewCompleteKeyRange returns a complete key range.
func NewCompleteKeyRange() *topodatapb.KeyRange {
return &topodatapb.KeyRange{
Start: []byte{},
End: []byte{},
}
}

// KeyRangeAdd adds two adjacent KeyRange values (in any order) into a single value. If the values are not adjacent,
// it returns false.
func KeyRangeAdd(a, b *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) {
Expand Down
Loading
Loading