From 1f04e14bef3d601012564e64d3de4ed60b4cbd70 Mon Sep 17 00:00:00 2001 From: Song Gao Date: Thu, 1 Dec 2022 13:04:01 +0800 Subject: [PATCH] domain: support dump historical stats in background job (#39417) --- domain/BUILD.bazel | 1 + domain/domain.go | 49 ++++++++++++++++++++++ domain/historical_stats.go | 63 ++++++++++++++++++++++++++++ executor/analyze.go | 16 +------ executor/analyzetest/analyze_test.go | 5 +++ session/session.go | 5 ++- testkit/mockstore.go | 1 + 7 files changed, 125 insertions(+), 15 deletions(-) create mode 100644 domain/historical_stats.go diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index 4b575f4dc63e0..97fced3ad05d4 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -6,6 +6,7 @@ go_library( "domain.go", "domain_sysvars.go", "domainctx.go", + "historical_stats.go", "optimize_trace.go", "plan_replayer.go", "plan_replayer_dump.go", diff --git a/domain/domain.go b/domain/domain.go index 66fcf3ca0e3b3..0f0425178f06f 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -119,6 +119,7 @@ type Domain struct { planReplayerHandle *planReplayerHandle expiredTimeStamp4PC types.Time logBackupAdvancer *daemon.OwnerDaemon + historicalStatsWorker *HistoricalStatsWorker serverID uint64 serverIDSession *concurrency.Session @@ -1586,6 +1587,14 @@ func (do *Domain) SetupPlanReplayerHandle(collectorSctx, dumperSctx sessionctx.C } } +// SetupHistoricalStatsWorker setups worker +func (do *Domain) SetupHistoricalStatsWorker(ctx sessionctx.Context) { + do.historicalStatsWorker = &HistoricalStatsWorker{ + tblCH: make(chan int64, 16), + sctx: ctx, + } +} + // SetupDumpFileGCChecker setup sctx func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) { do.dumpFileGcChecker.setupSctx(ctx) @@ -1595,6 +1604,7 @@ var planReplayerHandleLease atomic.Uint64 func init() { planReplayerHandleLease.Store(uint64(10 * time.Second)) + enableDumpHistoricalStats.Store(true) } // DisablePlanReplayerBackgroundJob4Test disable plan replayer handle for test @@ -1602,6 +1612,11 @@ func DisablePlanReplayerBackgroundJob4Test() { planReplayerHandleLease.Store(0) } +// DisableDumpHistoricalStats4Test disable historical dump worker for test +func DisableDumpHistoricalStats4Test() { + enableDumpHistoricalStats.Store(false) +} + // StartPlanReplayerHandle start plan replayer handle job func (do *Domain) StartPlanReplayerHandle() { lease := planReplayerHandleLease.Load() @@ -1673,6 +1688,40 @@ func (do *Domain) DumpFileGcCheckerLoop() { }() } +// GetHistoricalStatsWorker gets historical workers +func (do *Domain) GetHistoricalStatsWorker() *HistoricalStatsWorker { + return do.historicalStatsWorker +} + +// EnableDumpHistoricalStats used to control whether enbale dump stats for unit test +var enableDumpHistoricalStats atomic.Bool + +// StartHistoricalStatsWorker start historical workers running +func (do *Domain) StartHistoricalStatsWorker() { + if !enableDumpHistoricalStats.Load() { + return + } + do.wg.Add(1) + go func() { + defer func() { + do.wg.Done() + logutil.BgLogger().Info("HistoricalStatsWorker exited.") + util.Recover(metrics.LabelDomain, "HistoricalStatsWorkerLoop", nil, false) + }() + for { + select { + case <-do.exit: + return + case tblID := <-do.historicalStatsWorker.tblCH: + err := do.historicalStatsWorker.DumpHistoricalStats(tblID, do.StatsHandle()) + if err != nil { + logutil.BgLogger().Warn("dump historical stats failed", zap.Error(err), zap.Int64("tableID", tblID)) + } + } + } + }() +} + // StatsHandle returns the statistic handle. func (do *Domain) StatsHandle() *handle.Handle { return (*handle.Handle)(atomic.LoadPointer(&do.statsHandle)) diff --git a/domain/historical_stats.go b/domain/historical_stats.go new file mode 100644 index 0000000000000..04d50608c58c4 --- /dev/null +++ b/domain/historical_stats.go @@ -0,0 +1,63 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package domain + +import ( + "github.com/pingcap/errors" + "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/statistics/handle" +) + +// HistoricalStatsWorker indicates for dump historical stats +type HistoricalStatsWorker struct { + tblCH chan int64 + sctx sessionctx.Context +} + +// SendTblToDumpHistoricalStats send tableID to worker to dump historical stats +func (w *HistoricalStatsWorker) SendTblToDumpHistoricalStats(tableID int64) { + w.tblCH <- tableID +} + +// DumpHistoricalStats dump stats by given tableID +func (w *HistoricalStatsWorker) DumpHistoricalStats(tableID int64, statsHandle *handle.Handle) error { + historicalStatsEnabled, err := statsHandle.CheckHistoricalStatsEnable() + if err != nil { + return errors.Errorf("check tidb_enable_historical_stats failed: %v", err) + } + if !historicalStatsEnabled { + return nil + } + sctx := w.sctx + is := GetDomain(sctx).InfoSchema() + tbl, existed := is.TableByID(tableID) + if !existed { + return errors.Errorf("cannot get table by id %d", tableID) + } + tblInfo := tbl.Meta() + dbInfo, existed := is.SchemaByTable(tblInfo) + if !existed { + return errors.Errorf("cannot get DBInfo by TableID %d", tableID) + } + if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil { + return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O) + } + return nil +} + +// GetOneHistoricalStatsTable gets one tableID from channel, only used for test +func (w *HistoricalStatsWorker) GetOneHistoricalStatsTable() int64 { + return <-w.tblCH +} diff --git a/executor/analyze.go b/executor/analyze.go index f08f1ad932a9c..28a0a44066c62 100644 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -267,20 +267,8 @@ func recordHistoricalStats(sctx sessionctx.Context, tableID int64) error { if !historicalStatsEnabled { return nil } - - is := domain.GetDomain(sctx).InfoSchema() - tbl, existed := is.TableByID(tableID) - if !existed { - return errors.Errorf("cannot get table by id %d", tableID) - } - tblInfo := tbl.Meta() - dbInfo, existed := is.SchemaByTable(tblInfo) - if !existed { - return errors.Errorf("cannot get DBInfo by TableID %d", tableID) - } - if _, err := statsHandle.RecordHistoricalStatsToStorage(dbInfo.Name.O, tblInfo); err != nil { - return errors.Errorf("record table %s.%s's historical stats failed", dbInfo.Name.O, tblInfo.Name.O) - } + historicalStatsWorker := domain.GetDomain(sctx).GetHistoricalStatsWorker() + historicalStatsWorker.SendTblToDumpHistoricalStats(tableID) return nil } diff --git a/executor/analyzetest/analyze_test.go b/executor/analyzetest/analyze_test.go index cc447cdf39aea..63df848541013 100644 --- a/executor/analyzetest/analyze_test.go +++ b/executor/analyzetest/analyze_test.go @@ -2192,6 +2192,11 @@ func TestRecordHistoryStatsAfterAnalyze(t *testing.T) { tk.MustExec("set global tidb_enable_historical_stats = 1") defer tk.MustExec("set global tidb_enable_historical_stats = 0") tk.MustExec("analyze table t with 2 topn") + // dump historical stats + hsWorker := dom.GetHistoricalStatsWorker() + tblID := hsWorker.GetOneHistoricalStatsTable() + err = hsWorker.DumpHistoricalStats(tblID, h) + require.Nil(t, err) rows = tk.MustQuery(fmt.Sprintf("select count(*) from mysql.stats_history where table_id = '%d'", tableInfo.Meta().ID)).Rows() num, _ = strconv.Atoi(rows[0][0].(string)) require.GreaterOrEqual(t, num, 1) diff --git a/session/session.go b/session/session.go index 9104c4186bf74..3dd2f6a09bd68 100644 --- a/session/session.go +++ b/session/session.go @@ -2950,7 +2950,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { analyzeConcurrencyQuota := int(config.GetGlobalConfig().Performance.AnalyzePartitionConcurrencyQuota) concurrency := int(config.GetGlobalConfig().Performance.StatsLoadConcurrency) - ses, err := createSessions(store, 9) + ses, err := createSessions(store, 10) if err != nil { return nil, err } @@ -3030,6 +3030,9 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) { // setup dumpFileGcChecker dom.SetupDumpFileGCChecker(ses[8]) dom.DumpFileGcCheckerLoop() + // setup historical stats worker + dom.SetupHistoricalStatsWorker(ses[9]) + dom.StartHistoricalStatsWorker() // A sub context for update table stats, and other contexts for concurrent stats loading. cnt := 1 + concurrency diff --git a/testkit/mockstore.go b/testkit/mockstore.go index 525381dd9c148..12afe0e0f2f68 100644 --- a/testkit/mockstore.go +++ b/testkit/mockstore.go @@ -80,6 +80,7 @@ func bootstrap(t testing.TB, store kv.Storage, lease time.Duration) *domain.Doma session.SetSchemaLease(lease) session.DisableStats4Test() domain.DisablePlanReplayerBackgroundJob4Test() + domain.DisableDumpHistoricalStats4Test() dom, err := session.BootstrapSession(store) require.NoError(t, err)