From 5e3ee6cc84e4e9bcb3e4457525f800d0c21c958a Mon Sep 17 00:00:00 2001 From: jairui Date: Sun, 24 Nov 2024 17:21:10 +0800 Subject: [PATCH 1/2] fix: merge schemaEngine connection pool into taskPool Signed-off-by: jairui --- .../vreplication/external_connector.go | 7 ++- .../vreplication/replica_connector.go | 8 ++- .../tabletserver/background/task_pool.go | 2 +- .../tabletserver/messager/engine_test.go | 4 +- .../tabletserver/query_engine_test.go | 53 ++++++++++++++----- go/vt/vttablet/tabletserver/queryz_test.go | 2 +- go/vt/vttablet/tabletserver/schema/engine.go | 36 ++++--------- .../tabletserver/schema/engine_test.go | 42 +++++++++++---- .../vttablet/tabletserver/schema/historian.go | 24 ++++----- .../vttablet/tabletserver/schema/main_test.go | 4 +- go/vt/vttablet/tabletserver/tabletserver.go | 2 +- .../tabletserver/vstreamer/testenv/testenv.go | 8 ++- 12 files changed, 121 insertions(+), 71 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go index 27801c0e63..979fc363e3 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/external_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/external_connector.go @@ -35,6 +35,7 @@ import ( "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vttablet/queryservice" "vitess.io/vitess/go/vt/vttablet/tabletconn" + "vitess.io/vitess/go/vt/vttablet/tabletserver/background" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer" @@ -92,12 +93,14 @@ func (ec *externalConnector) Get(name string) (*mysqlConnector, error) { } c := &mysqlConnector{} c.env = tabletenv.NewEnv(config, name) - c.se = schema.NewEngine(c.env) + c.taskPool = background.NewTaskPool(c.env) + c.se = schema.NewEngine(c.env, c.taskPool) c.vstreamer = vstreamer.NewEngine(c.env, nil, c.se, nil, "") c.vstreamer.InitDBConfig("", "") c.se.InitDBConfig(c.env.Config().DB.AllPrivsWithDB()) // Open + c.taskPool.Open() if err := c.se.Open(); err != nil { return nil, vterrors.Wrapf(err, "external mysqlConnector: %v", name) } @@ -114,11 +117,13 @@ type mysqlConnector struct { env tabletenv.Env se *schema.Engine vstreamer *vstreamer.Engine + taskPool *background.TaskPool } func (c *mysqlConnector) shutdown() { c.vstreamer.Close() c.se.Close() + c.taskPool.Close() } func (c *mysqlConnector) Open(ctx context.Context) error { diff --git a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go index 228b24ea30..7c56c57452 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go +++ b/go/vt/vttablet/tabletmanager/vreplication/replica_connector.go @@ -24,6 +24,7 @@ package vreplication import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/vt/dbconfigs" + "vitess.io/vitess/go/vt/vttablet/tabletserver/background" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "context" @@ -54,13 +55,14 @@ func NewReplicaConnector(connParams *mysql.ConnParams) *ReplicaConnector { config.DB = dbCfg c := &ReplicaConnector{conn: connParams} env := tabletenv.NewEnv(config, "source") - c.se = schema.NewEngine(env) + c.taskPool = background.NewTaskPool(env) + c.se = schema.NewEngine(env, c.taskPool) c.se.SkipMetaCheck = true c.vstreamer = vstreamer.NewEngine(env, nil, c.se, nil, "") c.se.InitDBConfig(dbconfigs.New(connParams)) // Open - + c.taskPool.Open() c.vstreamer.Open() return c @@ -70,6 +72,7 @@ func NewReplicaConnector(connParams *mysql.ConnParams) *ReplicaConnector { type ReplicaConnector struct { conn *mysql.ConnParams + taskPool *background.TaskPool se *schema.Engine vstreamer *vstreamer.Engine } @@ -77,6 +80,7 @@ type ReplicaConnector struct { func (c *ReplicaConnector) shutdown() { c.vstreamer.Close() c.se.Close() + c.taskPool.Close() } func (c *ReplicaConnector) Open(ctx context.Context) error { diff --git a/go/vt/vttablet/tabletserver/background/task_pool.go b/go/vt/vttablet/tabletserver/background/task_pool.go index 84567e95c4..5b6b9984fb 100644 --- a/go/vt/vttablet/tabletserver/background/task_pool.go +++ b/go/vt/vttablet/tabletserver/background/task_pool.go @@ -13,7 +13,7 @@ import ( ) var backGroundTaskPoolConfig = tabletenv.ConnPoolConfig{ - Size: 1, + Size: 5, MaxSize: 50, TimeoutSeconds: 1, IdleTimeoutSeconds: 30 * 60, diff --git a/go/vt/vttablet/tabletserver/messager/engine_test.go b/go/vt/vttablet/tabletserver/messager/engine_test.go index 31d91b1c66..ca628fd0e9 100644 --- a/go/vt/vttablet/tabletserver/messager/engine_test.go +++ b/go/vt/vttablet/tabletserver/messager/engine_test.go @@ -25,6 +25,7 @@ import ( "vitess.io/vitess/go/sqltypes" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/background" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" ) @@ -160,7 +161,8 @@ func newTestEngine(db *fakesqldb.DB) *Engine { tsv := &fakeTabletServer{ Env: tabletenv.NewEnv(config, "MessagerTest"), } - se := schema.NewEngine(tsv) + taskPool := background.NewTaskPool(tsv) + se := schema.NewEngine(tsv, taskPool) te := NewEngine(tsv, se, newFakeVStreamer()) te.Open() return te diff --git a/go/vt/vttablet/tabletserver/query_engine_test.go b/go/vt/vttablet/tabletserver/query_engine_test.go index 6103983151..e645b961d2 100644 --- a/go/vt/vttablet/tabletserver/query_engine_test.go +++ b/go/vt/vttablet/tabletserver/query_engine_test.go @@ -50,6 +50,7 @@ import ( "vitess.io/vitess/go/streamlog" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/tableacl" + "vitess.io/vitess/go/vt/vttablet/tabletserver/background" "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema/schematest" @@ -60,7 +61,9 @@ func TestGetPlanPanicDuetoEmptyQuery(t *testing.T) { db := fakesqldb.New(t) defer db.Close() schematest.AddDefaultQueries(db) - qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + qe, taskPool := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + taskPool.Open() + defer taskPool.Close() qe.se.Open() qe.Open() defer qe.Close() @@ -95,7 +98,9 @@ func TestGetMessageStreamPlan(t *testing.T) { addSchemaEngineQueries(db) - qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + qe, taskPool := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + taskPool.Open() + defer taskPool.Close() qe.se.Open() qe.Open() defer qe.Close() @@ -141,7 +146,9 @@ func TestQueryPlanCache(t *testing.T) { db.AddQuery("select * from test_table_01 where 1 != 1", &sqltypes.Result{}) db.AddQuery("select * from test_table_02 where 1 != 1", &sqltypes.Result{}) - qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + qe, taskPool := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + taskPool.Open() + defer taskPool.Close() qe.se.Open() qe.Open() defer qe.Close() @@ -178,7 +185,9 @@ func TestNoQueryPlanCache(t *testing.T) { db.AddQuery("select * from test_table_01 where 1 != 1", &sqltypes.Result{}) db.AddQuery("select * from test_table_02 where 1 != 1", &sqltypes.Result{}) - qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + qe, taskPool := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + taskPool.Open() + defer taskPool.Close() qe.se.Open() qe.Open() defer qe.Close() @@ -207,7 +216,9 @@ func TestNoQueryPlanCacheDirective(t *testing.T) { db.AddQuery("select /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ * from test_table_01 where 1 != 1", &sqltypes.Result{}) db.AddQuery("select /*vt+ SKIP_QUERY_PLAN_CACHE=1 */ * from test_table_02 where 1 != 1", &sqltypes.Result{}) - qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + qe, taskPool := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + taskPool.Open() + defer taskPool.Close() qe.se.Open() qe.Open() defer qe.Close() @@ -232,7 +243,9 @@ func TestStatsURL(t *testing.T) { schematest.AddDefaultQueries(db) query := "select * from test_table_01" db.AddQuery("select * from test_table_01 where 1 != 1", &sqltypes.Result{}) - qe := newTestQueryEngine(1*time.Second, true, newDBConfigs(db)) + qe, taskPool := newTestQueryEngine(1*time.Second, true, newDBConfigs(db)) + taskPool.Open() + defer taskPool.Close() qe.se.Open() qe.Open() defer qe.Close() @@ -254,24 +267,27 @@ func TestStatsURL(t *testing.T) { qe.handleHTTPQueryRules(response, request) } -func newTestQueryEngine(idleTimeout time.Duration, _ bool, dbcfgs *dbconfigs.DBConfigs) *QueryEngine { +func newTestQueryEngine(idleTimeout time.Duration, _ bool, dbcfgs *dbconfigs.DBConfigs) (*QueryEngine, *background.TaskPool) { config := tabletenv.NewDefaultConfig() config.DB = dbcfgs config.OltpReadPool.IdleTimeoutSeconds.Set(idleTimeout) config.OlapReadPool.IdleTimeoutSeconds.Set(idleTimeout) config.TxPool.IdleTimeoutSeconds.Set(idleTimeout) env := tabletenv.NewEnv(config, "TabletServerTest") - se := schema.NewEngine(env) + taskPool := background.NewTaskPool(env) + se := schema.NewEngine(env, taskPool) qe := NewQueryEngine(env, se) se.InitDBConfig(dbcfgs.DbaWithDB()) - return qe + return qe, taskPool } func runConsolidatedQuery(t *testing.T, sql string) *QueryEngine { db := fakesqldb.New(t) defer db.Close() - qe := newTestQueryEngine(1*time.Second, true, newDBConfigs(db)) + qe, taskPool := newTestQueryEngine(1*time.Second, true, newDBConfigs(db)) + taskPool.Open() + defer taskPool.Close() qe.se.Open() qe.Open() defer qe.Close() @@ -334,7 +350,9 @@ func BenchmarkPlanCacheThroughput(b *testing.B) { db.AddQueryPattern(".*", &sqltypes.Result{}) - qe := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + qe, taskPool := newTestQueryEngine(10*time.Second, true, newDBConfigs(db)) + taskPool.Open() + defer taskPool.Close() qe.se.Open() qe.Open() defer qe.Close() @@ -360,9 +378,12 @@ func benchmarkPlanCache(b *testing.B, db *fakesqldb.DB, lfu bool, par int) { config.QueryCacheLFU = lfu env := tabletenv.NewEnv(config, "TabletServerTest") - se := schema.NewEngine(env) + taskPool := background.NewTaskPool(env) + se := schema.NewEngine(env, taskPool) qe := NewQueryEngine(env, se) + taskPool.Open() + defer taskPool.Close() se.InitDBConfig(dbcfgs.DbaWithDB()) require.NoError(b, se.Open()) require.NoError(b, qe.Open()) @@ -422,9 +443,12 @@ func TestPlanCachePollution(t *testing.T) { // config.LFUQueryCacheSizeBytes = 3 * 1024 * 1024 env := tabletenv.NewEnv(config, "TabletServerTest") - se := schema.NewEngine(env) + taskPool := background.NewTaskPool(env) + se := schema.NewEngine(env, taskPool) qe := NewQueryEngine(env, se) + taskPool.Open() + defer taskPool.Close() se.InitDBConfig(dbcfgs.DbaWithDB()) se.Open() @@ -614,7 +638,8 @@ func TestAddQueryStats(t *testing.T) { config := tabletenv.NewDefaultConfig() config.DB = newDBConfigs(fakesqldb.New(t)) env := tabletenv.NewEnv(config, "TestAddQueryStats_"+testcase.name) - se := schema.NewEngine(env) + taskPool := background.NewTaskPool(env) + se := schema.NewEngine(env, taskPool) qe := NewQueryEngine(env, se) qe.AddStats(testcase.planType, testcase.tableName, testcase.queryCount, testcase.duration, testcase.mysqlTime, testcase.rowsAffected, testcase.rowsReturned, testcase.errorCount) assert.Equal(t, testcase.expectedQueryCounts, qe.queryCounts.String()) diff --git a/go/vt/vttablet/tabletserver/queryz_test.go b/go/vt/vttablet/tabletserver/queryz_test.go index 15ff49e3ec..06ae71acca 100644 --- a/go/vt/vttablet/tabletserver/queryz_test.go +++ b/go/vt/vttablet/tabletserver/queryz_test.go @@ -35,7 +35,7 @@ import ( func TestQueryzHandler(t *testing.T) { resp := httptest.NewRecorder() req, _ := http.NewRequest("GET", "/schemaz", nil) - qe := newTestQueryEngine(10*time.Second, true, &dbconfigs.DBConfigs{}) + qe, _ := newTestQueryEngine(10*time.Second, true, &dbconfigs.DBConfigs{}) const query1 = "select name from test_table" plan1 := &TabletPlan{ diff --git a/go/vt/vttablet/tabletserver/schema/engine.go b/go/vt/vttablet/tabletserver/schema/engine.go index 0f3164f2e0..1cfbfcd0d3 100644 --- a/go/vt/vttablet/tabletserver/schema/engine.go +++ b/go/vt/vttablet/tabletserver/schema/engine.go @@ -49,6 +49,7 @@ import ( "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tabletserver/background" "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -83,8 +84,8 @@ type Engine struct { historian *historian - conns *connpool.Pool - ticks *timer.Timer + taskPool *background.TaskPool + ticks *timer.Timer // dbCreationFailed is for preventing log spam. dbCreationFailed bool @@ -96,16 +97,11 @@ type Engine struct { } // NewEngine creates a new Engine. -func NewEngine(env tabletenv.Env) *Engine { +func NewEngine(env tabletenv.Env, taskPool *background.TaskPool) *Engine { reloadTime := env.Config().SchemaReloadIntervalSeconds.Get() se := &Engine{ - env: env, - // We need three connections: one for the reloader, one for - // the historian, and one for the tracker. - conns: connpool.NewPool(env, "SchemaEngine", tabletenv.ConnPoolConfig{ - Size: 3, - IdleTimeoutSeconds: env.Config().OltpReadPool.IdleTimeoutSeconds, - }), + env: env, + taskPool: taskPool, ticks: timer.NewTimer(reloadTime), reloadTime: reloadTime, } @@ -127,7 +123,7 @@ func NewEngine(env tabletenv.Env) *Engine { schemazHandler(se.GetFullyQualifiedTables(), w, r) }) - se.historian = newHistorian(env.Config().TrackSchemaVersions, se.conns) + se.historian = newHistorian(env.Config().TrackSchemaVersions, se.taskPool) return se } @@ -231,17 +227,6 @@ func (se *Engine) Open() error { ctx := tabletenv.LocalContext() - // The function we're in is supposed to be idempotent, but this conns.Open() - // call is not itself idempotent. Therefore, if we return for any reason - // without marking ourselves as open, we need to call conns.Close() so the - // pools aren't leaked the next time we call Open(). - se.conns.Open(se.cp, se.cp, se.cp) - defer func() { - if !se.isOpen { - se.conns.Close() - } - }() - se.tables = map[sqlparser.TableSchemaAndName]*Table{ sqlparser.NewTableSchemaAndName("", "dual"): NewTable("dual"), } @@ -301,7 +286,6 @@ func (se *Engine) closeLocked() { wg.Done() }() se.historian.Close() - se.conns.Close() se.tables = make(map[sqlparser.TableSchemaAndName]*Table) se.lastChange = 0 @@ -384,7 +368,7 @@ func (se *Engine) reload(ctx context.Context, includeStats bool) error { se.SchemaReloadTimings.Record("SchemaReload", start) }() - conn, err := se.conns.Get(ctx, nil) + conn, err := se.taskPool.BorrowConn(ctx, nil) if err != nil { return err } @@ -583,7 +567,7 @@ func (se *Engine) GetTableFromSchema(tableSchema string, tableName string) (*Tab setting.SetQuery(fmt.Sprintf("use %s", tableSchema)) setting.SetResetQuery(fmt.Sprintf("use %s", se.cp.DBName())) - conn, err := se.conns.Get(ctx, &setting) + conn, err := se.taskPool.BorrowConn(ctx, &setting) if err != nil { return nil, err } @@ -738,7 +722,7 @@ func (se *Engine) GetFullyQualifiedTables() map[string]*Table { // GetConnection returns a connection from the pool func (se *Engine) GetConnection(ctx context.Context) (*connpool.DBConn, error) { - return se.conns.Get(ctx, nil) + return se.taskPool.BorrowConn(ctx, nil) } func (se *Engine) handleDebugSchema(response http.ResponseWriter, request *http.Request) { diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index aa2399947a..25e65b243e 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -42,6 +42,7 @@ import ( "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vttablet/tabletserver/background" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema/schematest" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" @@ -81,7 +82,9 @@ func TestOpenAndReload(t *testing.T) { )) firstReadRowsValue := 12 AddFakeInnoDBReadRowsResult(db, firstReadRowsValue) - se := newEngine(10, 10*time.Second, 10*time.Second, db) + se, taskPool := newEngine(10, 10*time.Second, 10*time.Second, db) + taskPool.Open() + defer taskPool.Close() se.Open() defer se.Close() @@ -273,9 +276,12 @@ func TestReloadWithSwappedTables(t *testing.T) { firstReadRowsValue := 12 AddFakeInnoDBReadRowsResult(db, firstReadRowsValue) - se := newEngine(10, 10*time.Second, 10*time.Second, db) + se, taskPool := newEngine(10, 10*time.Second, 10*time.Second, db) + taskPool.Open() + defer taskPool.Close() se.Open() defer se.Close() + want := initialSchema() mustMatch(t, want, se.GetSchema2(dbName)) @@ -425,7 +431,10 @@ func TestOpenFailedDueToExecErr(t *testing.T) { schematest.AddDefaultQueries(db) want := "injected error" db.RejectQueryPattern(baseShowTablesPattern, want) - se := newEngine(10, 1*time.Second, 1*time.Second, db) + se, taskPool := newEngine(10, 1*time.Second, 1*time.Second, db) + taskPool.Open() + defer taskPool.Close() + err := se.Open() if err == nil || !strings.Contains(err.Error(), want) { t.Errorf("se.Open: %v, want %s", err, want) @@ -455,7 +464,10 @@ func TestOpenFailedDueToTableErr(t *testing.T) { }) AddFakeInnoDBReadRowsResult(db, 0) - se := newEngine(10, 1*time.Second, 1*time.Second, db) + se, taskPool := newEngine(10, 1*time.Second, 1*time.Second, db) + taskPool.Open() + defer taskPool.Close() + err := se.Open() want := "Row count exceeded" if err == nil || !strings.Contains(err.Error(), want) { @@ -467,9 +479,12 @@ func TestExportVars(t *testing.T) { db := fakesqldb.New(t) defer db.Close() schematest.AddDefaultQueries(db) - se := newEngine(10, 1*time.Second, 1*time.Second, db) + se, taskPool := newEngine(10, 1*time.Second, 1*time.Second, db) + taskPool.Open() + defer taskPool.Close() se.Open() defer se.Close() + expvar.Do(func(kv expvar.KeyValue) { _ = kv.Value.String() }) @@ -479,7 +494,9 @@ func TestStatsURL(t *testing.T) { db := fakesqldb.New(t) defer db.Close() schematest.AddDefaultQueries(db) - se := newEngine(10, 1*time.Second, 1*time.Second, db) + se, taskPool := newEngine(10, 1*time.Second, 1*time.Second, db) + taskPool.Open() + defer taskPool.Close() se.Open() defer se.Close() @@ -509,7 +526,10 @@ func TestSchemaEngineCloseTickRace(t *testing.T) { }) AddFakeInnoDBReadRowsResult(db, 12) // Start the engine with a small reload tick - se := newEngine(10, 100*time.Millisecond, 1*time.Second, db) + se, taskPool := newEngine(10, 100*time.Millisecond, 1*time.Second, db) + taskPool.Open() + defer taskPool.Close() + err := se.Open() require.NoError(t, err) @@ -536,16 +556,18 @@ func TestSchemaEngineCloseTickRace(t *testing.T) { } } -func newEngine(queryCacheSize int, reloadTime time.Duration, idleTimeout time.Duration, db *fakesqldb.DB) *Engine { +func newEngine(queryCacheSize int, reloadTime time.Duration, idleTimeout time.Duration, db *fakesqldb.DB) (*Engine, *background.TaskPool) { config := tabletenv.NewDefaultConfig() config.QueryCacheSize = queryCacheSize config.SchemaReloadIntervalSeconds.Set(reloadTime) config.OltpReadPool.IdleTimeoutSeconds.Set(idleTimeout) config.OlapReadPool.IdleTimeoutSeconds.Set(idleTimeout) config.TxPool.IdleTimeoutSeconds.Set(idleTimeout) - se := NewEngine(tabletenv.NewEnv(config, "SchemaTest")) + env := tabletenv.NewEnv(config, "SchemaTest") + taskPool := background.NewTaskPool(env) + se := NewEngine(env, taskPool) se.InitDBConfig(newDBConfigs(db).DbaWithDB()) - return se + return se, taskPool } func newDBConfigs(db *fakesqldb.DB) *dbconfigs.DBConfigs { diff --git a/go/vt/vttablet/tabletserver/schema/historian.go b/go/vt/vttablet/tabletserver/schema/historian.go index 6744bf0377..a7f551560b 100644 --- a/go/vt/vttablet/tabletserver/schema/historian.go +++ b/go/vt/vttablet/tabletserver/schema/historian.go @@ -29,7 +29,7 @@ import ( "vitess.io/vitess/go/vt/log" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" "vitess.io/vitess/go/vt/vtgate/evalengine" - "vitess.io/vitess/go/vt/vttablet/tabletserver/connpool" + "vitess.io/vitess/go/vt/vttablet/tabletserver/background" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/sqlparser" @@ -51,20 +51,20 @@ type trackedSchema struct { // and supplying a schema for a specific version by loading the cached values from the schema_version table // The schema version table is populated by the Tracker type historian struct { - conns *connpool.Pool - lastID int64 - schemas []*trackedSchema - mu sync.Mutex - enabled bool - isOpen bool + taskPool *background.TaskPool + lastID int64 + schemas []*trackedSchema + mu sync.Mutex + enabled bool + isOpen bool } // newHistorian creates a new historian. It expects a schema.Engine instance -func newHistorian(enabled bool, conns *connpool.Pool) *historian { +func newHistorian(enabled bool, taskPool *background.TaskPool) *historian { sh := historian{ - conns: conns, - lastID: 0, - enabled: enabled, + taskPool: taskPool, + lastID: 0, + enabled: enabled, } return &sh } @@ -159,7 +159,7 @@ func (h *historian) GetTableForPos(tableName sqlparser.IdentifierCS, gtid string // loadFromDB loads all rows from the schema_version table that the historian does not have as yet // caller should have locked h.mu func (h *historian) loadFromDB(ctx context.Context) error { - conn, err := h.conns.Get(ctx, nil) + conn, err := h.taskPool.BorrowConn(ctx, nil) if err != nil { return err } diff --git a/go/vt/vttablet/tabletserver/schema/main_test.go b/go/vt/vttablet/tabletserver/schema/main_test.go index ada5c8085a..c31a70ef0f 100644 --- a/go/vt/vttablet/tabletserver/schema/main_test.go +++ b/go/vt/vttablet/tabletserver/schema/main_test.go @@ -37,9 +37,11 @@ func getTestSchemaEngine(t *testing.T) (*Engine, *fakesqldb.DB, func()) { db.AddQueryPattern(baseShowTablesPattern, &sqltypes.Result{}) db.AddQuery(mysql.BaseShowPrimary, &sqltypes.Result{}) AddFakeInnoDBReadRowsResult(db, 1) - se := newEngine(10, 10*time.Second, 10*time.Second, db) + se, taskPool := newEngine(10, 10*time.Second, 10*time.Second, db) + taskPool.Open() require.NoError(t, se.Open()) cancel := func() { + defer taskPool.Close() defer db.Close() defer se.Close() } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index ad44337254..7e182c474b 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -193,7 +193,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to tsv.statefulql = NewQueryList("oltp-stateful") tsv.olapql = NewQueryList("olap") tsv.hs = newHealthStreamer(tsv, alias, tsv.taskPool) - tsv.se = schema.NewEngine(tsv) + tsv.se = schema.NewEngine(tsv, tsv.taskPool) tsv.rt = repltracker.NewReplTracker(tsv, alias) tsv.lagThrottler = throttle.NewThrottler(tsv, srvTopoServer, topoServer, alias.Cell, tsv.rt.HeartbeatWriter(), tabletTypeFunc) tsv.vstreamer = vstreamer.NewEngine(tsv, srvTopoServer, tsv.se, tsv.lagThrottler, alias.Cell) diff --git a/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go b/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go index d69d226cb1..8ea439d22f 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go +++ b/go/vt/vttablet/tabletserver/vstreamer/testenv/testenv.go @@ -36,6 +36,7 @@ import ( "vitess.io/vitess/go/vt/srvtopo" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/memorytopo" + "vitess.io/vitess/go/vt/vttablet/tabletserver/background" "vitess.io/vitess/go/vt/vttablet/tabletserver/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" "vitess.io/vitess/go/vt/vttest" @@ -54,6 +55,7 @@ type Env struct { Cells []string TabletEnv tabletenv.Env + TaskPool *background.TaskPool TopoServ *topo.Server SrvTopo srvtopo.Server Dbcfgs *dbconfigs.DBConfigs @@ -113,6 +115,7 @@ func Init() (*Env, error) { config := tabletenv.NewDefaultConfig() config.DB = te.Dbcfgs te.TabletEnv = tabletenv.NewEnv(config, "VStreamerTest") + te.TaskPool = background.NewTaskPool(te.TabletEnv) te.Mysqld = mysqlctl.NewMysqld(te.Dbcfgs) pos, _ := te.Mysqld.PrimaryPosition() te.Flavor = pos.GTIDSet.Flavor() @@ -138,8 +141,10 @@ func Init() (*Env, error) { return nil, fmt.Errorf("could not parse database patch version from '%s': %v", dbVersionStr, err) } - te.SchemaEngine = schema.NewEngine(te.TabletEnv) + te.SchemaEngine = schema.NewEngine(te.TabletEnv, te.TaskPool) te.SchemaEngine.InitDBConfig(te.Dbcfgs.DbaWithDB()) + + te.TaskPool.Open() if err := te.SchemaEngine.Open(); err != nil { return nil, err } @@ -157,6 +162,7 @@ func Init() (*Env, error) { // Close tears down TestEnv. func (te *Env) Close() { te.SchemaEngine.Close() + te.TaskPool.Close() te.Mysqld.Close() te.cluster.TearDown() os.RemoveAll(te.cluster.Config.SchemaDir) From 687251548b0c90daae3a9373d033a2ca621fe5e0 Mon Sep 17 00:00:00 2001 From: jairui Date: Mon, 25 Nov 2024 23:04:30 +0800 Subject: [PATCH 2/2] fix ut Signed-off-by: jairui --- go/vt/vttablet/tabletserver/schema/engine_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/go/vt/vttablet/tabletserver/schema/engine_test.go b/go/vt/vttablet/tabletserver/schema/engine_test.go index 25e65b243e..191a74d8ec 100644 --- a/go/vt/vttablet/tabletserver/schema/engine_test.go +++ b/go/vt/vttablet/tabletserver/schema/engine_test.go @@ -563,6 +563,7 @@ func newEngine(queryCacheSize int, reloadTime time.Duration, idleTimeout time.Du config.OltpReadPool.IdleTimeoutSeconds.Set(idleTimeout) config.OlapReadPool.IdleTimeoutSeconds.Set(idleTimeout) config.TxPool.IdleTimeoutSeconds.Set(idleTimeout) + config.DB = newDBConfigs(db) env := tabletenv.NewEnv(config, "SchemaTest") taskPool := background.NewTaskPool(env) se := NewEngine(env, taskPool)