From afbce6aa877d66c99ddb942ebe610d0ec5a5e5f1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vicent=20Mart=C3=AD?= <42793+vmg@users.noreply.github.com> Date: Thu, 23 May 2024 09:21:40 +0200 Subject: [PATCH] connpool: Allow time out during shutdown (#15979) Signed-off-by: Vicent Marti --- go/pools/smartconnpool/pool.go | 101 ++++++++++++++---- go/pools/smartconnpool/pool_test.go | 72 +++++++++++-- go/vt/vttablet/endtoend/misc_test.go | 6 +- go/vt/vttablet/endtoend/stream_test.go | 7 +- go/vt/vttablet/tabletserver/connpool/pool.go | 6 -- .../tabletserver/connpool/pool_test.go | 7 +- go/vt/vttablet/tabletserver/debugenv.go | 18 +++- go/vt/vttablet/tabletserver/query_executor.go | 20 +--- go/vt/vttablet/tabletserver/tabletserver.go | 14 +-- .../tabletserver/tabletserver_test.go | 12 ++- go/vt/vttablet/tabletserver/tx_pool_test.go | 3 +- 11 files changed, 189 insertions(+), 77 deletions(-) diff --git a/go/pools/smartconnpool/pool.go b/go/pools/smartconnpool/pool.go index a4317dc8447..d49032f34a1 100644 --- a/go/pools/smartconnpool/pool.go +++ b/go/pools/smartconnpool/pool.go @@ -32,10 +32,16 @@ import ( var ( // ErrTimeout is returned if a connection get times out. - ErrTimeout = vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "resource pool timed out") + ErrTimeout = vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "connection pool timed out") // ErrCtxTimeout is returned if a ctx is already expired by the time the connection pool is used - ErrCtxTimeout = vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "resource pool context already expired") + ErrCtxTimeout = vterrors.New(vtrpcpb.Code_DEADLINE_EXCEEDED, "connection pool context already expired") + + // ErrConnPoolClosed is returned when trying to get a connection from a closed conn pool + ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "connection pool is closed") + + // PoolCloseTimeout is how long to wait for all connections to be returned to the pool during close + PoolCloseTimeout = 10 * time.Second ) type Metrics struct { @@ -119,8 +125,9 @@ type ConnPool[C Connection] struct { capacity atomic.Int64 // workers is a waitgroup for all the currently running worker goroutines - workers sync.WaitGroup - close chan struct{} + workers sync.WaitGroup + close chan struct{} + capacityMu sync.Mutex config struct { // connect is the callback to create a new connection for the pool @@ -142,6 +149,7 @@ type ConnPool[C Connection] struct { } Metrics Metrics + Name string } // NewPool creates a new connection pool with the given Config. @@ -236,29 +244,60 @@ func (pool *ConnPool[C]) Open(connect Connector[C], refresh RefreshCheck) *ConnP // Close shuts down the pool. No connections will be returned from ConnPool.Get after calling this, // but calling ConnPool.Put is still allowed. This function will not return until all of the pool's -// connections have been returned. +// connections have been returned or the default PoolCloseTimeout has elapsed func (pool *ConnPool[C]) Close() { - if pool.close == nil { + ctx, cancel := context.WithTimeout(context.Background(), PoolCloseTimeout) + defer cancel() + + if err := pool.CloseWithContext(ctx); err != nil { + log.Errorf("failed to close pool %q: %v", pool.Name, err) + } +} + +// CloseWithContext behaves like Close but allows passing in a Context to time out the +// pool closing operation +func (pool *ConnPool[C]) CloseWithContext(ctx context.Context) error { + pool.capacityMu.Lock() + defer pool.capacityMu.Unlock() + + if pool.close == nil || pool.capacity.Load() == 0 { // already closed - return + return nil } - pool.SetCapacity(0) + // close all the connections in the pool; if we time out while waiting for + // users to return our connections, we still want to finish the shutdown + // for the pool + err := pool.setCapacity(ctx, 0) close(pool.close) pool.workers.Wait() pool.close = nil + return err } func (pool *ConnPool[C]) reopen() { + pool.capacityMu.Lock() + defer pool.capacityMu.Unlock() + capacity := pool.capacity.Load() if capacity == 0 { return } - pool.Close() - pool.open() - pool.SetCapacity(capacity) + ctx, cancel := context.WithTimeout(context.Background(), PoolCloseTimeout) + defer cancel() + + // to re-open the connection pool, first set the capacity to 0 so we close + // all the existing connections, as they're now connected to a stale MySQL + // instance. + if err := pool.setCapacity(ctx, 0); err != nil { + log.Errorf("failed to reopen pool %q: %v", pool.Name, err) + } + + // the second call to setCapacity cannot fail because it's only increasing the number + // of connections and doesn't need to shut down any + _ = pool.setCapacity(ctx, capacity) } // IsOpen returns whether the pool is open @@ -322,7 +361,7 @@ func (pool *ConnPool[C]) Get(ctx context.Context, setting *Setting) (*Pooled[C], return nil, ErrCtxTimeout } if pool.capacity.Load() == 0 { - return nil, ErrTimeout + return nil, ErrConnPoolClosed } if setting == nil { return pool.get(ctx) @@ -571,39 +610,55 @@ func (pool *ConnPool[C]) getWithSetting(ctx context.Context, setting *Setting) ( // If the capacity is smaller than the number of connections that there are // currently open, we'll close enough connections before returning, even if // that means waiting for clients to return connections to the pool. -func (pool *ConnPool[C]) SetCapacity(newcap int64) { +// If the given context times out before we've managed to close enough connections +// an error will be returned. +func (pool *ConnPool[C]) SetCapacity(ctx context.Context, newcap int64) error { + pool.capacityMu.Lock() + defer pool.capacityMu.Unlock() + return pool.setCapacity(ctx, newcap) +} + +// setCapacity is the internal implementation for SetCapacity; it must be called +// with pool.capacityMu being held +func (pool *ConnPool[C]) setCapacity(ctx context.Context, newcap int64) error { if newcap < 0 { panic("negative capacity") } oldcap := pool.capacity.Swap(newcap) if oldcap == newcap { - return + return nil } - backoff := 1 * time.Millisecond + const delay = 10 * time.Millisecond // close connections until we're under capacity for pool.active.Load() > newcap { + if err := ctx.Err(); err != nil { + return vterrors.Errorf(vtrpcpb.Code_ABORTED, + "timed out while waiting for connections to be returned to the pool (capacity=%d, active=%d, borrowed=%d)", + pool.capacity.Load(), pool.active.Load(), pool.borrowed.Load()) + } + // if we're closing down the pool, make sure there's no clients waiting + // for connections because they won't be returned in the future + if newcap == 0 { + pool.wait.expire(true) + } + // try closing from connections which are currently idle in the stacks conn := pool.getFromSettingsStack(nil) if conn == nil { conn, _ = pool.clean.Pop() } if conn == nil { - time.Sleep(backoff) - backoff += 1 * time.Millisecond + time.Sleep(delay) continue } conn.Close() pool.closedConn() } - // if we're closing down the pool, wake up any blocked waiters because no connections - // are going to be returned in the future - if newcap == 0 { - pool.wait.expire(true) - } + return nil } func (pool *ConnPool[C]) closeIdleResources(now time.Time) { @@ -659,6 +714,8 @@ func (pool *ConnPool[C]) RegisterStats(stats *servenv.Exporter, name string) { return } + pool.Name = name + stats.NewGaugeFunc(name+"Capacity", "Tablet server conn pool capacity", func() int64 { return pool.Capacity() }) diff --git a/go/pools/smartconnpool/pool_test.go b/go/pools/smartconnpool/pool_test.go index 9a9fb9500b6..701327005ad 100644 --- a/go/pools/smartconnpool/pool_test.go +++ b/go/pools/smartconnpool/pool_test.go @@ -208,13 +208,15 @@ func TestOpen(t *testing.T) { assert.EqualValues(t, 6, state.lastID.Load()) // SetCapacity - p.SetCapacity(3) + err = p.SetCapacity(ctx, 3) + require.NoError(t, err) assert.EqualValues(t, 3, state.open.Load()) assert.EqualValues(t, 6, state.lastID.Load()) assert.EqualValues(t, 3, p.Capacity()) assert.EqualValues(t, 3, p.Available()) - p.SetCapacity(6) + err = p.SetCapacity(ctx, 6) + require.NoError(t, err) assert.EqualValues(t, 6, p.Capacity()) assert.EqualValues(t, 6, p.Available()) @@ -265,7 +267,9 @@ func TestShrinking(t *testing.T) { } done := make(chan bool) go func() { - p.SetCapacity(3) + err := p.SetCapacity(ctx, 3) + require.NoError(t, err) + done <- true }() expected := map[string]any{ @@ -335,7 +339,8 @@ func TestShrinking(t *testing.T) { // This will also wait go func() { - p.SetCapacity(2) + err := p.SetCapacity(ctx, 2) + require.NoError(t, err) done <- true }() time.Sleep(10 * time.Millisecond) @@ -353,7 +358,8 @@ func TestShrinking(t *testing.T) { assert.EqualValues(t, 2, state.open.Load()) // Test race condition of SetCapacity with itself - p.SetCapacity(3) + err = p.SetCapacity(ctx, 3) + require.NoError(t, err) for i := 0; i < 3; i++ { var r *Pooled[*TestConn] var err error @@ -375,9 +381,15 @@ func TestShrinking(t *testing.T) { time.Sleep(10 * time.Millisecond) // This will wait till we Put - go p.SetCapacity(2) + go func() { + err := p.SetCapacity(ctx, 2) + require.NoError(t, err) + }() time.Sleep(10 * time.Millisecond) - go p.SetCapacity(4) + go func() { + err := p.SetCapacity(ctx, 4) + require.NoError(t, err) + }() time.Sleep(10 * time.Millisecond) // This should not hang @@ -387,7 +399,7 @@ func TestShrinking(t *testing.T) { <-done assert.Panics(t, func() { - p.SetCapacity(-1) + _ = p.SetCapacity(ctx, -1) }) assert.EqualValues(t, 4, p.Capacity()) @@ -530,6 +542,46 @@ func TestReopen(t *testing.T) { assert.EqualValues(t, 0, state.open.Load()) } +func TestUserClosing(t *testing.T) { + var state TestState + + ctx := context.Background() + p := NewPool(&Config[*TestConn]{ + Capacity: 5, + IdleTimeout: time.Second, + LogWait: state.LogWait, + }).Open(newConnector(&state), nil) + + var resources [5]*Pooled[*TestConn] + for i := 0; i < 5; i++ { + var err error + resources[i], err = p.Get(ctx, nil) + require.NoError(t, err) + } + + for _, r := range resources[:4] { + r.Recycle() + } + + ch := make(chan error) + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err := p.CloseWithContext(ctx) + ch <- err + close(ch) + }() + + select { + case <-time.After(5 * time.Second): + t.Fatalf("Pool did not shutdown after 5s") + case err := <-ch: + require.Error(t, err) + t.Logf("Shutdown error: %v", err) + } +} + func TestIdleTimeout(t *testing.T) { testTimeout := func(t *testing.T, setting *Setting) { var state TestState @@ -818,7 +870,7 @@ func TestTimeout(t *testing.T) { newctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) _, err = p.Get(newctx, setting) cancel() - assert.EqualError(t, err, "resource pool timed out") + assert.EqualError(t, err, "connection pool timed out") } @@ -842,7 +894,7 @@ func TestExpired(t *testing.T) { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(-1*time.Second)) _, err := p.Get(ctx, setting) cancel() - require.EqualError(t, err, "resource pool context already expired") + require.EqualError(t, err, "connection pool context already expired") } } diff --git a/go/vt/vttablet/endtoend/misc_test.go b/go/vt/vttablet/endtoend/misc_test.go index c054179c20a..768399572db 100644 --- a/go/vt/vttablet/endtoend/misc_test.go +++ b/go/vt/vttablet/endtoend/misc_test.go @@ -261,8 +261,10 @@ func TestSidecarTables(t *testing.T) { } func TestConsolidation(t *testing.T) { - defer framework.Server.SetPoolSize(framework.Server.PoolSize()) - framework.Server.SetPoolSize(1) + defer framework.Server.SetPoolSize(context.Background(), framework.Server.PoolSize()) + + err := framework.Server.SetPoolSize(context.Background(), 1) + require.NoError(t, err) const tag = "Waits/Histograms/Consolidations/Count" diff --git a/go/vt/vttablet/endtoend/stream_test.go b/go/vt/vttablet/endtoend/stream_test.go index 05045fd6f7d..a3c73dd8152 100644 --- a/go/vt/vttablet/endtoend/stream_test.go +++ b/go/vt/vttablet/endtoend/stream_test.go @@ -17,6 +17,7 @@ limitations under the License. package endtoend import ( + "context" "errors" "fmt" "reflect" @@ -98,11 +99,13 @@ func TestStreamConsolidation(t *testing.T) { defaultPoolSize := framework.Server.StreamPoolSize() - framework.Server.SetStreamPoolSize(4) + err = framework.Server.SetStreamPoolSize(context.Background(), 4) + require.NoError(t, err) + framework.Server.SetStreamConsolidationBlocking(true) defer func() { - framework.Server.SetStreamPoolSize(defaultPoolSize) + _ = framework.Server.SetStreamPoolSize(context.Background(), defaultPoolSize) framework.Server.SetStreamConsolidationBlocking(false) }() diff --git a/go/vt/vttablet/tabletserver/connpool/pool.go b/go/vt/vttablet/tabletserver/connpool/pool.go index 567745e37b5..14fcc6d0f2e 100644 --- a/go/vt/vttablet/tabletserver/connpool/pool.go +++ b/go/vt/vttablet/tabletserver/connpool/pool.go @@ -31,15 +31,9 @@ import ( "vitess.io/vitess/go/vt/dbconnpool" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/servenv" - "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" - - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) -// ErrConnPoolClosed is returned when the connection pool is closed. -var ErrConnPoolClosed = vterrors.New(vtrpcpb.Code_INTERNAL, "internal error: unexpected: conn pool is closed") - const ( getWithoutS = "GetWithoutSettings" getWithS = "GetWithSettings" diff --git a/go/vt/vttablet/tabletserver/connpool/pool_test.go b/go/vt/vttablet/tabletserver/connpool/pool_test.go index 28f3e27803a..8cf27cbb327 100644 --- a/go/vt/vttablet/tabletserver/connpool/pool_test.go +++ b/go/vt/vttablet/tabletserver/connpool/pool_test.go @@ -67,7 +67,7 @@ func TestConnPoolTimeout(t *testing.T) { require.NoError(t, err) defer dbConn.Recycle() _, err = connPool.Get(context.Background(), nil) - assert.EqualError(t, err, "resource pool timed out") + assert.EqualError(t, err, "connection pool timed out") } func TestConnPoolGetEmptyDebugConfig(t *testing.T) { @@ -126,9 +126,10 @@ func TestConnPoolSetCapacity(t *testing.T) { defer connPool.Close() assert.Panics(t, func() { - connPool.SetCapacity(-10) + _ = connPool.SetCapacity(context.Background(), -10) }) - connPool.SetCapacity(10) + err := connPool.SetCapacity(context.Background(), 10) + assert.NoError(t, err) if connPool.Capacity() != 10 { t.Fatalf("capacity should be 10") } diff --git a/go/vt/vttablet/tabletserver/debugenv.go b/go/vt/vttablet/tabletserver/debugenv.go index c780a28ed90..924d5acbebb 100644 --- a/go/vt/vttablet/tabletserver/debugenv.go +++ b/go/vt/vttablet/tabletserver/debugenv.go @@ -17,6 +17,7 @@ limitations under the License. package tabletserver import ( + "context" "encoding/json" "fmt" "html" @@ -82,6 +83,17 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) f(ival) msg = fmt.Sprintf("Setting %v to: %v", varname, value) } + setIntValCtx := func(f func(context.Context, int) error) { + ival, err := strconv.Atoi(value) + if err == nil { + err = f(r.Context(), ival) + if err == nil { + msg = fmt.Sprintf("Setting %v to: %v", varname, value) + return + } + } + msg = fmt.Sprintf("Failed setting value for %v: %v", varname, err) + } setInt64Val := func(f func(int64)) { ival, err := strconv.ParseInt(value, 10, 64) if err != nil { @@ -111,11 +123,11 @@ func debugEnvHandler(tsv *TabletServer, w http.ResponseWriter, r *http.Request) } switch varname { case "PoolSize": - setIntVal(tsv.SetPoolSize) + setIntValCtx(tsv.SetPoolSize) case "StreamPoolSize": - setIntVal(tsv.SetStreamPoolSize) + setIntValCtx(tsv.SetStreamPoolSize) case "TxPoolSize": - setIntVal(tsv.SetTxPoolSize) + setIntValCtx(tsv.SetTxPoolSize) case "MaxResultSize": setIntVal(tsv.SetMaxResultSize) case "WarnResultSize": diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index b538f9342eb..d5099b1a0cc 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -780,15 +780,7 @@ func (qre *QueryExecutor) getConn() (*connpool.PooledConn, error) { defer func(start time.Time) { qre.logStats.WaitingForConnection += time.Since(start) }(time.Now()) - conn, err := qre.tsv.qe.conns.Get(ctx, qre.setting) - - switch err { - case nil: - return conn, nil - case connpool.ErrConnPoolClosed: - return nil, err - } - return nil, err + return qre.tsv.qe.conns.Get(ctx, qre.setting) } func (qre *QueryExecutor) getStreamConn() (*connpool.PooledConn, error) { @@ -798,15 +790,7 @@ func (qre *QueryExecutor) getStreamConn() (*connpool.PooledConn, error) { defer func(start time.Time) { qre.logStats.WaitingForConnection += time.Since(start) }(time.Now()) - conn, err := qre.tsv.qe.streamConns.Get(ctx, qre.setting) - - switch err { - case nil: - return conn, nil - case connpool.ErrConnPoolClosed: - return nil, err - } - return nil, err + return qre.tsv.qe.streamConns.Get(ctx, qre.setting) } // txFetch fetches from a TxConnection. diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 87a6904d99a..d74bcb09952 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -1996,11 +1996,11 @@ func (tsv *TabletServer) EnableHistorian(enabled bool) { } // SetPoolSize changes the pool size to the specified value. -func (tsv *TabletServer) SetPoolSize(val int) { +func (tsv *TabletServer) SetPoolSize(ctx context.Context, val int) error { if val <= 0 { - return + return nil } - tsv.qe.conns.SetCapacity(int64(val)) + return tsv.qe.conns.SetCapacity(ctx, int64(val)) } // PoolSize returns the pool size. @@ -2009,8 +2009,8 @@ func (tsv *TabletServer) PoolSize() int { } // SetStreamPoolSize changes the pool size to the specified value. -func (tsv *TabletServer) SetStreamPoolSize(val int) { - tsv.qe.streamConns.SetCapacity(int64(val)) +func (tsv *TabletServer) SetStreamPoolSize(ctx context.Context, val int) error { + return tsv.qe.streamConns.SetCapacity(ctx, int64(val)) } // SetStreamConsolidationBlocking sets whether the stream consolidator should wait for slow clients @@ -2024,8 +2024,8 @@ func (tsv *TabletServer) StreamPoolSize() int { } // SetTxPoolSize changes the tx pool size to the specified value. -func (tsv *TabletServer) SetTxPoolSize(val int) { - tsv.te.txPool.scp.conns.SetCapacity(int64(val)) +func (tsv *TabletServer) SetTxPoolSize(ctx context.Context, val int) error { + return tsv.te.txPool.scp.conns.SetCapacity(ctx, int64(val)) } // TxPoolSize returns the tx pool size. diff --git a/go/vt/vttablet/tabletserver/tabletserver_test.go b/go/vt/vttablet/tabletserver/tabletserver_test.go index 3e546546485..92bfa25650a 100644 --- a/go/vt/vttablet/tabletserver/tabletserver_test.go +++ b/go/vt/vttablet/tabletserver/tabletserver_test.go @@ -2133,7 +2133,9 @@ func TestConfigChanges(t *testing.T) { newSize := 10 newDuration := time.Duration(10 * time.Millisecond) - tsv.SetPoolSize(newSize) + err := tsv.SetPoolSize(context.Background(), newSize) + require.NoError(t, err) + if val := tsv.PoolSize(); val != newSize { t.Errorf("PoolSize: %d, want %d", val, newSize) } @@ -2141,7 +2143,9 @@ func TestConfigChanges(t *testing.T) { t.Errorf("tsv.qe.connPool.Capacity: %d, want %d", val, newSize) } - tsv.SetStreamPoolSize(newSize) + err = tsv.SetStreamPoolSize(context.Background(), newSize) + require.NoError(t, err) + if val := tsv.StreamPoolSize(); val != newSize { t.Errorf("StreamPoolSize: %d, want %d", val, newSize) } @@ -2149,7 +2153,9 @@ func TestConfigChanges(t *testing.T) { t.Errorf("tsv.qe.streamConnPool.Capacity: %d, want %d", val, newSize) } - tsv.SetTxPoolSize(newSize) + err = tsv.SetTxPoolSize(context.Background(), newSize) + require.NoError(t, err) + if val := tsv.TxPoolSize(); val != newSize { t.Errorf("TxPoolSize: %d, want %d", val, newSize) } diff --git a/go/vt/vttablet/tabletserver/tx_pool_test.go b/go/vt/vttablet/tabletserver/tx_pool_test.go index 37500ada79a..aa2d5b69e89 100644 --- a/go/vt/vttablet/tabletserver/tx_pool_test.go +++ b/go/vt/vttablet/tabletserver/tx_pool_test.go @@ -216,7 +216,8 @@ func primeTxPoolWithConnection(t *testing.T, ctx context.Context) (*fakesqldb.DB db := fakesqldb.New(t) txPool, _ := newTxPool() // Set the capacity to 1 to ensure that the db connection is reused. - txPool.scp.conns.SetCapacity(1) + err := txPool.scp.conns.SetCapacity(context.Background(), 1) + require.NoError(t, err) params := dbconfigs.New(db.ConnParams()) txPool.Open(params, params, params)