From 059e50d54326a21e56fc09fffd8429efa6924f83 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Wed, 28 Feb 2024 12:01:11 +0530 Subject: [PATCH] Fix PRS from being blocked because of misbehaving clients (#15339) Signed-off-by: Manan Gupta --- changelog/20.0/20.0.0/summary.md | 12 ++- go/flags/endtoend/vtcombo.txt | 2 +- go/flags/endtoend/vttablet.txt | 2 +- go/test/endtoend/tabletgateway/main_test.go | 15 ++-- go/test/endtoend/tabletgateway/vtgate_test.go | 39 ++++++++++ .../vtgate/transaction/restart/main_test.go | 3 + go/vt/vttablet/tabletserver/query_executor.go | 2 +- .../vttablet/tabletserver/requests_waiter.go | 78 +++++++++++++++++++ .../tabletserver/requests_waiter_test.go | 57 ++++++++++++++ go/vt/vttablet/tabletserver/state_manager.go | 72 +++++++++-------- .../tabletserver/state_manager_test.go | 3 +- .../vttablet/tabletserver/tabletenv/config.go | 3 + .../tabletserver/tabletenv/config_test.go | 3 +- go/vt/vttablet/tabletserver/tabletserver.go | 1 + .../tabletserver/tabletserver_test.go | 6 +- 15 files changed, 253 insertions(+), 45 deletions(-) create mode 100644 go/vt/vttablet/tabletserver/requests_waiter.go create mode 100644 go/vt/vttablet/tabletserver/requests_waiter_test.go diff --git a/changelog/20.0/20.0.0/summary.md b/changelog/20.0/20.0.0/summary.md index bb376c6e721..cd3b9718503 100644 --- a/changelog/20.0/20.0.0/summary.md +++ b/changelog/20.0/20.0.0/summary.md @@ -3,6 +3,8 @@ ### Table of Contents - **[Major Changes](#major-changes)** + - **[Breaking changes](#breaking-changes)** + - [`shutdown_grace_period` Default Change](#shutdown-grace-period-default) - **[Query Compatibility](#query-compatibility)** - [Vindex Hints](#vindex-hints) - [Update with Limit Support](#update-limit) @@ -16,6 +18,15 @@ ## Major Changes +### Breaking Changes + +#### `shutdown_grace_period` Default Change + +The `--shutdown_grace_period` flag, which was introduced in v2 with a default of `0 seconds`, has now been changed to default to `3 seconds`. +This makes reparenting in Vitess resilient to client errors, and prevents PlannedReparentShard from timing out. + +In order to preserve the old behaviour, the users can set the flag back to `0 seconds` causing open transactions to never be shutdown, but in that case, they run the risk of PlannedReparentShard calls timing out. + ### Query Compatibility #### Vindex Hints @@ -61,7 +72,6 @@ The `--pprof-http` flag, which was introduced in v19 with a default of `true`, h This makes HTTP `pprof` endpoints now an *opt-in* feature, rather than opt-out. To continue enabling these endpoints, explicitly set `--pprof-http` when starting up Vitess components. - ## Minor Changes ### New Stats diff --git a/go/flags/endtoend/vtcombo.txt b/go/flags/endtoend/vtcombo.txt index 04a46d9e754..26e145e6930 100644 --- a/go/flags/endtoend/vtcombo.txt +++ b/go/flags/endtoend/vtcombo.txt @@ -319,7 +319,7 @@ Flags: --service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice --serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state --shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s) - --shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown. + --shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown. (default 3s) --sql-max-length-errors int truncate queries in error logs to the given length (default unlimited) --sql-max-length-ui int truncate queries in debug UIs to the given length (default 512) (default 512) --srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s) diff --git a/go/flags/endtoend/vttablet.txt b/go/flags/endtoend/vttablet.txt index 6ff475badfa..ea023318b01 100644 --- a/go/flags/endtoend/vttablet.txt +++ b/go/flags/endtoend/vttablet.txt @@ -319,7 +319,7 @@ Flags: --service_map strings comma separated list of services to enable (or disable if prefixed with '-') Example: grpc-queryservice --serving_state_grace_period duration how long to pause after broadcasting health to vtgate, before enforcing a new serving state --shard_sync_retry_delay duration delay between retries of updates to keep the tablet and its shard record in sync (default 30s) - --shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown. + --shutdown_grace_period duration how long to wait for queries and transactions to complete during graceful shutdown. (default 3s) --sql-max-length-errors int truncate queries in error logs to the given length (default unlimited) --sql-max-length-ui int truncate queries in debug UIs to the given length (default 512) (default 512) --srv_topo_cache_refresh duration how frequently to refresh the topology for cached entries (default 1s) diff --git a/go/test/endtoend/tabletgateway/main_test.go b/go/test/endtoend/tabletgateway/main_test.go index da4fe711f64..354be6969d3 100644 --- a/go/test/endtoend/tabletgateway/main_test.go +++ b/go/test/endtoend/tabletgateway/main_test.go @@ -18,6 +18,7 @@ package healthcheck import ( "flag" + "fmt" "os" "testing" @@ -26,11 +27,12 @@ import ( ) var ( - clusterInstance *cluster.LocalProcessCluster - vtParams mysql.ConnParams - keyspaceName = "commerce" - cell = "zone1" - sqlSchema = `create table product( + clusterInstance *cluster.LocalProcessCluster + vtParams mysql.ConnParams + keyspaceName = "commerce" + vtgateGrpcAddress string + cell = "zone1" + sqlSchema = `create table product( sku varbinary(128), description varbinary(128), price bigint, @@ -64,7 +66,7 @@ func TestMain(m *testing.M) { exitCode := func() int { clusterInstance = cluster.NewCluster(cell, "localhost") - clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s"} + clusterInstance.VtTabletExtraArgs = []string{"--health_check_interval", "1s", "--shutdown_grace_period", "3s"} defer clusterInstance.Teardown() // Start topo server @@ -96,6 +98,7 @@ func TestMain(m *testing.M) { Host: clusterInstance.Hostname, Port: clusterInstance.VtgateMySQLPort, } + vtgateGrpcAddress = fmt.Sprintf("%s:%d", clusterInstance.Hostname, clusterInstance.VtgateGrpcPort) return m.Run() }() os.Exit(exitCode) diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index c48aa6c2131..d9cedc04b69 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/utils" vtorcutils "vitess.io/vitess/go/test/endtoend/vtorc/utils" + querypb "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/topodata" ) @@ -283,6 +284,44 @@ func TestReplicaTransactions(t *testing.T) { assert.Equal(t, `[[INT64(1) VARCHAR("email1")] [INT64(2) VARCHAR("email2")]]`, fmt.Sprintf("%v", qr4.Rows), "we are not able to reconnect after restart") } +// TestStreamingRPCStuck tests that StreamExecute calls don't get stuck on the vttablets if a client stop reading from a stream. +func TestStreamingRPCStuck(t *testing.T) { + defer cluster.PanicHandler(t) + ctx := context.Background() + vtConn, err := mysql.Connect(ctx, &vtParams) + require.NoError(t, err) + defer vtConn.Close() + + // We want the table to have enough rows such that a streaming call returns multiple packets. + // Therefore, we insert one row and keep doubling it. + utils.Exec(t, vtConn, "insert into customer(email) values('testemail')") + for i := 0; i < 15; i++ { + // Double the number of rows in customer table. + utils.Exec(t, vtConn, "insert into customer (email) select email from customer") + } + + // Connect to vtgate and run a streaming query. + vtgateConn, err := cluster.DialVTGate(ctx, t.Name(), vtgateGrpcAddress, "test_user", "") + require.NoError(t, err) + stream, err := vtgateConn.Session("", &querypb.ExecuteOptions{}).StreamExecute(ctx, "select * from customer", map[string]*querypb.BindVariable{}) + require.NoError(t, err) + + // We read packets until we see the first set of results. This ensures that the stream is working. + for { + res, err := stream.Recv() + require.NoError(t, err) + if res != nil && len(res.Rows) > 0 { + // breaking here stops reading from the stream. + break + } + } + + // We simulate a misbehaving client that doesn't read from the stream anymore. + // This however shouldn't block PlannedReparentShard calls. + err = clusterInstance.VtctldClientProcess.PlannedReparentShard(keyspaceName, "0", clusterInstance.Keyspaces[0].Shards[0].Vttablets[1].Alias) + require.NoError(t, err) +} + func getMapFromJSON(JSON map[string]any, key string) map[string]any { result := make(map[string]any) object := reflect.ValueOf(JSON[key]) diff --git a/go/test/endtoend/vtgate/transaction/restart/main_test.go b/go/test/endtoend/vtgate/transaction/restart/main_test.go index 3c7ac710e9d..de52a3e8870 100644 --- a/go/test/endtoend/vtgate/transaction/restart/main_test.go +++ b/go/test/endtoend/vtgate/transaction/restart/main_test.go @@ -60,6 +60,9 @@ func TestMain(m *testing.M) { Name: keyspaceName, SchemaSQL: schemaSQL, } + clusterInstance.VtTabletExtraArgs = append(clusterInstance.VtTabletExtraArgs, + "--shutdown_grace_period=0s", + ) err = clusterInstance.StartUnshardedKeyspace(*keyspace, 1, false) if err != nil { return 1 diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 5690c209ebb..f371d62006c 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -1117,7 +1117,7 @@ func (qre *QueryExecutor) execStreamSQL(conn *connpool.PooledConn, isTransaction // Add query detail object into QueryExecutor TableServer list w.r.t if it is a transactional or not. Previously we were adding it // to olapql list regardless but that resulted in problems, where long-running stream queries which can be stateful (or transactional) - // weren't getting cleaned up during unserveCommon>handleShutdownGracePeriod in state_manager.go. + // weren't getting cleaned up during unserveCommon>terminateAllQueries in state_manager.go. // This change will ensure that long-running streaming stateful queries get gracefully shutdown during ServingTypeChange // once their grace period is over. qd := NewQueryDetail(qre.logStats.Ctx, conn.Conn) diff --git a/go/vt/vttablet/tabletserver/requests_waiter.go b/go/vt/vttablet/tabletserver/requests_waiter.go new file mode 100644 index 00000000000..39e08f924cc --- /dev/null +++ b/go/vt/vttablet/tabletserver/requests_waiter.go @@ -0,0 +1,78 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletserver + +import "sync" + +// requestsWaiter is used to wait for requests. It stores the count of the requests pending, +// and also the number of waiters currently waiting. It has a mutex as well to protects its fields. +type requestsWaiter struct { + mu sync.Mutex + wg sync.WaitGroup + // waitCounter is the number of goroutines that are waiting for wg to be empty. + // If this value is greater than zero, then we have to ensure that we don't Add to the requests + // to avoid any panics in the wait. + waitCounter int + // counter is the count of the number of outstanding requests. + counter int +} + +// newRequestsWaiter creates a new requestsWaiter. +func newRequestsWaiter() *requestsWaiter { + return &requestsWaiter{} +} + +// Add adds to the requestsWaiter. +func (r *requestsWaiter) Add(val int) { + r.mu.Lock() + defer r.mu.Unlock() + r.counter += val + r.wg.Add(val) +} + +// Done subtracts 1 from the requestsWaiter. +func (r *requestsWaiter) Done() { + r.Add(-1) +} + +// addToWaitCounter adds to the waitCounter while being protected by a mutex. +func (r *requestsWaiter) addToWaitCounter(val int) { + r.mu.Lock() + defer r.mu.Unlock() + r.waitCounter += val +} + +// WaitToBeEmpty waits for requests to be empty. It also increments and decrements the waitCounter as required. +func (r *requestsWaiter) WaitToBeEmpty() { + r.addToWaitCounter(1) + r.wg.Wait() + r.addToWaitCounter(-1) +} + +// GetWaiterCount gets the number of go routines currently waiting on the wait group. +func (r *requestsWaiter) GetWaiterCount() int { + r.mu.Lock() + defer r.mu.Unlock() + return r.waitCounter +} + +// GetOutstandingRequestsCount gets the number of requests outstanding. +func (r *requestsWaiter) GetOutstandingRequestsCount() int { + r.mu.Lock() + defer r.mu.Unlock() + return r.counter +} diff --git a/go/vt/vttablet/tabletserver/requests_waiter_test.go b/go/vt/vttablet/tabletserver/requests_waiter_test.go new file mode 100644 index 00000000000..078e32e92ca --- /dev/null +++ b/go/vt/vttablet/tabletserver/requests_waiter_test.go @@ -0,0 +1,57 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tabletserver + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// TestRequestWaiter tests the functionality of request waiter. +func TestRequestWaiter(t *testing.T) { + rw := newRequestsWaiter() + require.EqualValues(t, 0, rw.GetWaiterCount()) + require.EqualValues(t, 0, rw.GetOutstandingRequestsCount()) + + rw.Add(3) + require.EqualValues(t, 0, rw.GetWaiterCount()) + require.EqualValues(t, 3, rw.GetOutstandingRequestsCount()) + + rw.Done() + require.EqualValues(t, 0, rw.GetWaiterCount()) + require.EqualValues(t, 2, rw.GetOutstandingRequestsCount()) + + go func() { + rw.WaitToBeEmpty() + }() + go func() { + rw.WaitToBeEmpty() + }() + + time.Sleep(100 * time.Millisecond) + require.EqualValues(t, 2, rw.GetWaiterCount()) + require.EqualValues(t, 2, rw.GetOutstandingRequestsCount()) + + rw.Done() + rw.Done() + + time.Sleep(100 * time.Millisecond) + require.EqualValues(t, 0, rw.GetWaiterCount()) + require.EqualValues(t, 0, rw.GetOutstandingRequestsCount()) +} diff --git a/go/vt/vttablet/tabletserver/state_manager.go b/go/vt/vttablet/tabletserver/state_manager.go index 9c01610f770..60b1f1281d0 100644 --- a/go/vt/vttablet/tabletserver/state_manager.go +++ b/go/vt/vttablet/tabletserver/state_manager.go @@ -99,12 +99,8 @@ type stateManager struct { alsoAllow []topodatapb.TabletType reason string transitionErr error - // requestsWaitCounter is the number of goroutines that are waiting for requests to be empty. - // If this value is greater than zero, then we have to ensure that we don't Add to the requests - // to avoid any panics in the wait. - requestsWaitCounter int - requests sync.WaitGroup + rw *requestsWaiter // QueryList does not have an Open or Close. statelessql *QueryList @@ -358,20 +354,6 @@ func (sm *stateManager) checkMySQL() { }() } -// addRequestsWaitCounter adds to the requestsWaitCounter while being protected by a mutex. -func (sm *stateManager) addRequestsWaitCounter(val int) { - sm.mu.Lock() - defer sm.mu.Unlock() - sm.requestsWaitCounter += val -} - -// waitForRequestsToBeEmpty waits for requests to be empty. It also increments and decrements the requestsWaitCounter as required. -func (sm *stateManager) waitForRequestsToBeEmpty() { - sm.addRequestsWaitCounter(1) - sm.requests.Wait() - sm.addRequestsWaitCounter(-1) -} - func (sm *stateManager) setWantState(stateWanted servingState) { sm.mu.Lock() defer sm.mu.Unlock() @@ -410,9 +392,9 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target } shuttingDown := sm.wantState != StateServing - // If requestsWaitCounter is not zero, then there are go-routines blocked on waiting for requests to be empty. + // If wait counter for the requests is not zero, then there are go-routines blocked on waiting for requests to be empty. // We cannot allow adding to the requests to prevent any panics from happening. - if (shuttingDown && !allowOnShutdown) || sm.requestsWaitCounter > 0 { + if (shuttingDown && !allowOnShutdown) || sm.rw.GetWaiterCount() > 0 { // This specific error string needs to be returned for vtgate buffering to work. return vterrors.New(vtrpcpb.Code_CLUSTER_EVENT, vterrors.ShuttingDown) } @@ -421,13 +403,13 @@ func (sm *stateManager) StartRequest(ctx context.Context, target *querypb.Target if err != nil { return err } - sm.requests.Add(1) + sm.rw.Add(1) return nil } // EndRequest unregisters the current request (a waitgroup) as done. func (sm *stateManager) EndRequest() { - sm.requests.Done() + sm.rw.Done() } // VerifyTarget allows requests to be executed even in non-serving state. @@ -507,7 +489,7 @@ func (sm *stateManager) unservePrimary() error { func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) error { // We are likely transitioning from primary. We have to honor // the shutdown grace period. - cancel := sm.handleShutdownGracePeriod() + cancel := sm.terminateAllQueries(nil) defer cancel() sm.ddle.Close() @@ -560,9 +542,12 @@ func (sm *stateManager) connect(tabletType topodatapb.TabletType) error { } func (sm *stateManager) unserveCommon() { + // We create a wait group that tracks whether all the queries have been terminated or not. + wg := sync.WaitGroup{} + wg.Add(1) log.Infof("Started execution of unserveCommon") - cancel := sm.handleShutdownGracePeriod() - log.Infof("Finished execution of handleShutdownGracePeriod") + cancel := sm.terminateAllQueries(&wg) + log.Infof("Finished execution of terminateAllQueries") defer cancel() log.Infof("Started online ddl executor close") @@ -580,22 +565,45 @@ func (sm *stateManager) unserveCommon() { log.Info("Finished Killing all OLAP queries. Started tracker close") sm.tracker.Close() log.Infof("Finished tracker close. Started wait for requests") - sm.waitForRequestsToBeEmpty() - log.Infof("Finished wait for requests. Finished execution of unserveCommon") + sm.handleShutdownGracePeriod(&wg) + log.Infof("Finished handling grace period. Finished execution of unserveCommon") +} + +// handleShutdownGracePeriod checks if we have shutdwonGracePeriod specified. +// If its not, then we have to wait for all the requests to be empty. +// Otherwise, we only wait for all the queries against MySQL to be terminated. +func (sm *stateManager) handleShutdownGracePeriod(wg *sync.WaitGroup) { + // If there is no shutdown grace period specified, then we should wait for all the requests to be empty. + if sm.shutdownGracePeriod == 0 { + sm.rw.WaitToBeEmpty() + } else { + // We quickly check if the requests are empty or not. + // If they are, then we don't need to wait for the shutdown to complete. + count := sm.rw.GetOutstandingRequestsCount() + if count == 0 { + return + } + // Otherwise, we should wait for all olap queries to be killed. + // We don't need to wait for requests to be empty since we have ensured all the queries against MySQL have been killed. + wg.Wait() + } } -func (sm *stateManager) handleShutdownGracePeriod() (cancel func()) { +func (sm *stateManager) terminateAllQueries(wg *sync.WaitGroup) (cancel func()) { if sm.shutdownGracePeriod == 0 { return func() {} } ctx, cancel := context.WithCancel(context.TODO()) go func() { + if wg != nil { + defer wg.Done() + } if err := timer.SleepContext(ctx, sm.shutdownGracePeriod); err != nil { return } log.Infof("Grace Period %v exceeded. Killing all OLTP queries.", sm.shutdownGracePeriod) sm.statelessql.TerminateAll() - log.Infof("Killed all stateful OLTP queries.") + log.Infof("Killed all stateless OLTP queries.") sm.statefulql.TerminateAll() log.Infof("Killed all OLTP queries.") }() @@ -645,7 +653,7 @@ func (sm *stateManager) setState(tabletType topodatapb.TabletType, state serving log.Infof("TabletServer transition: %v -> %v for tablet %s:%s/%s", sm.stateStringLocked(sm.target.TabletType, sm.state), sm.stateStringLocked(tabletType, state), sm.target.Cell, sm.target.Keyspace, sm.target.Shard) - sm.handleGracePeriod(tabletType) + sm.handleTransitionGracePeriod(tabletType) sm.target.TabletType = tabletType if sm.state == StateNotConnected { // If we're transitioning out of StateNotConnected, we have @@ -664,7 +672,7 @@ func (sm *stateManager) stateStringLocked(tabletType topodatapb.TabletType, stat return fmt.Sprintf("%v: %v, %v", tabletType, state, sm.ptsTimestamp.Local().Format("Jan 2, 2006 at 15:04:05 (MST)")) } -func (sm *stateManager) handleGracePeriod(tabletType topodatapb.TabletType) { +func (sm *stateManager) handleTransitionGracePeriod(tabletType topodatapb.TabletType) { if tabletType != topodatapb.TabletType_PRIMARY { // We allow serving of previous type only for a primary transition. sm.alsoAllow = nil diff --git a/go/vt/vttablet/tabletserver/state_manager_test.go b/go/vt/vttablet/tabletserver/state_manager_test.go index 59909888935..a0ef3557074 100644 --- a/go/vt/vttablet/tabletserver/state_manager_test.go +++ b/go/vt/vttablet/tabletserver/state_manager_test.go @@ -721,7 +721,7 @@ func TestPanicInWait(t *testing.T) { // Simulate going to a not serving state and calling unserveCommon that waits on requests. sm.wantState = StateNotServing - sm.waitForRequestsToBeEmpty() + sm.rw.WaitToBeEmpty() } func verifySubcomponent(t *testing.T, order int64, component any, state testState) { @@ -752,6 +752,7 @@ func newTestStateManager(t *testing.T) *stateManager { ddle: &testOnlineDDLExecutor{}, throttler: &testLagThrottler{}, tableGC: &testTableGC{}, + rw: newRequestsWaiter(), } sm.Init(env, &querypb.Target{}) sm.hs.InitDBConfig(&querypb.Target{}, dbconfigs.New(fakesqldb.New(t).ConnParams())) diff --git a/go/vt/vttablet/tabletserver/tabletenv/config.go b/go/vt/vttablet/tabletserver/tabletenv/config.go index 233f8951227..72682a75e30 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config.go @@ -975,6 +975,9 @@ var defaultConfig = TabletConfig{ Mode: Disable, HeartbeatInterval: 250 * time.Millisecond, }, + GracePeriods: GracePeriodsConfig{ + Shutdown: 3 * time.Second, + }, HotRowProtection: HotRowProtectionConfig{ Mode: Disable, // Default value is the same as TxPool.Size. diff --git a/go/vt/vttablet/tabletserver/tabletenv/config_test.go b/go/vt/vttablet/tabletserver/tabletenv/config_test.go index 98d4cfceb21..ace094ac899 100644 --- a/go/vt/vttablet/tabletserver/tabletenv/config_test.go +++ b/go/vt/vttablet/tabletserver/tabletenv/config_test.go @@ -123,7 +123,8 @@ func TestDefaultConfig(t *testing.T) { want := `consolidator: enable consolidatorStreamQuerySize: 2097152 consolidatorStreamTotalSize: 134217728 -gracePeriods: {} +gracePeriods: + shutdownSeconds: 3s healthcheck: degradedThresholdSeconds: 30s intervalSeconds: 20s diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index dad637a6daf..8a1a45ca4a2 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -204,6 +204,7 @@ func NewTabletServer(ctx context.Context, env *vtenv.Environment, name string, c ddle: tsv.onlineDDLExecutor, throttler: tsv.lagThrottler, tableGC: tsv.tableGC, + rw: newRequestsWaiter(), } tsv.exporter.NewGaugeFunc("TabletState", "Tablet server state", func() int64 { return int64(tsv.sm.State()) }) diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 9744b971946..df68c8b0a83 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -153,6 +153,10 @@ func TestTabletServerPrimaryToReplica(t *testing.T) { defer cancel() // Reuse code from tx_executor_test. _, tsv, db := newTestTxExecutor(t, ctx) + // This is required because the test is verifying that we rollback transactions on changing serving type, + // but that only happens immediately if the shut down grace period is not specified. + tsv.te.shutdownGracePeriod = 0 + tsv.sm.shutdownGracePeriod = 0 defer tsv.StopService() defer db.Close() target := querypb.Target{TabletType: topodatapb.TabletType_PRIMARY} @@ -180,7 +184,7 @@ func TestTabletServerPrimaryToReplica(t *testing.T) { select { case <-ch: t.Fatal("ch should not fire") - case <-time.After(10 * time.Millisecond): + case <-time.After(100 * time.Millisecond): } require.EqualValues(t, 1, tsv.te.txPool.scp.active.Size(), "tsv.te.txPool.scp.active.Size()")