Skip to content

Commit

Permalink
statistics: optimize stats delta dumping with batch processing (#58996)
Browse files Browse the repository at this point in the history
ref #57869
  • Loading branch information
Rustin170506 authored Jan 17, 2025
1 parent 0950fe3 commit f069f0b
Show file tree
Hide file tree
Showing 52 changed files with 494 additions and 606 deletions.
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ type SharedVars struct {
TableImporter *importer.TableImporter
DataEngine *backend.OpenedEngine
IndexEngine *backend.OpenedEngine
Progress *importer.Progress

mu sync.Mutex
Checksum *verification.KVGroupChecksum
Expand Down Expand Up @@ -183,5 +182,4 @@ type Checksum struct {
// This portion of the code may be implemented uniformly in the framework in the future.
type Result struct {
LoadedRowCnt uint64
ColSizeMap map[int64]int64
}
8 changes: 1 addition & 7 deletions pkg/disttask/importinto/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,14 +577,9 @@ func updateResult(handle storage.TaskHandle, task *proto.Task, taskMeta *TaskMet
}
subtaskMetas = append(subtaskMetas, &subtaskMeta)
}
columnSizeMap := make(map[int64]int64)
for _, subtaskMeta := range subtaskMetas {
taskMeta.Result.LoadedRowCnt += subtaskMeta.Result.LoadedRowCnt
for key, val := range subtaskMeta.Result.ColSizeMap {
columnSizeMap[key] += val
}
}
taskMeta.Result.ColSizeMap = columnSizeMap

if globalSort {
taskMeta.Result.LoadedRowCnt, err = getLoadedRowCountOnGlobalSort(handle, task)
Expand Down Expand Up @@ -662,8 +657,7 @@ func (sch *ImportSchedulerExt) finishJob(ctx context.Context, logger *zap.Logger
func(ctx context.Context) (bool, error) {
return true, taskHandle.WithNewSession(func(se sessionctx.Context) error {
if err := importer.FlushTableStats(ctx, se, taskMeta.Plan.TableInfo.ID, &importer.JobImportResult{
Affected: taskMeta.Result.LoadedRowCnt,
ColSizeMap: taskMeta.Result.ColSizeMap,
Affected: taskMeta.Result.LoadedRowCnt,
}); err != nil {
logger.Warn("flush table stats failed", zap.Error(err))
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/subtask_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
sharedVars.TableImporter,
sharedVars.DataEngine,
sharedVars.IndexEngine,
sharedVars.Progress,
logger,
checksum,
); err != nil {
Expand All @@ -82,7 +81,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr
sharedVars.TableImporter,
dataWriter,
indexWriter,
sharedVars.Progress,
logger,
checksum,
); err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/disttask/importinto/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt
TableImporter: s.tableImporter,
DataEngine: dataEngine,
IndexEngine: indexEngine,
Progress: importer.NewProgress(),
Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetKeySpace()),
SortedDataMeta: &external.SortedKVMeta{},
SortedIndexMetas: make(map[int64]*external.SortedKVMeta),
Expand Down Expand Up @@ -251,7 +250,6 @@ func (s *importStepExecutor) OnFinished(ctx context.Context, subtask *proto.Subt
}
subtaskMeta.Result = Result{
LoadedRowCnt: dataKVCount,
ColSizeMap: sharedVars.Progress.GetColSize(),
}
allocators := sharedVars.TableImporter.Allocators()
subtaskMeta.MaxIDs = map[autoid.AllocatorType]int64{
Expand Down
28 changes: 28 additions & 0 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2370,6 +2370,7 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
return nil
}
do.SetStatsUpdating(true)
do.wg.Run(do.asyncLoadHistogram, "asyncLoadHistogram")
// The stats updated worker doesn't require the stats initialization to be completed.
// This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations.
// These tasks do not interfere with or depend on the initialization process.
Expand Down Expand Up @@ -2522,6 +2523,33 @@ func (do *Domain) loadStatsWorker() {
if err != nil {
logutil.BgLogger().Warn("update stats info failed", zap.Error(err))
}
case <-do.exit:
return
}
}
}

func (do *Domain) asyncLoadHistogram() {
defer util.Recover(metrics.LabelDomain, "asyncLoadStats", nil, false)
lease := do.statsLease
if lease == 0 {
lease = 3 * time.Second
}
cleanupTicker := time.NewTicker(lease)
defer func() {
cleanupTicker.Stop()
logutil.BgLogger().Info("asyncLoadStats exited.")
}()
select {
case <-do.StatsHandle().InitStatsDone:
case <-do.exit: // It may happen that before initStatsDone, tidb receive Ctrl+C
return
}
statsHandle := do.StatsHandle()
var err error
for {
select {
case <-cleanupTicker.C:
err = statsHandle.LoadNeededHistograms(do.InfoSchema())
if err != nil {
logutil.BgLogger().Warn("load histograms failed", zap.Error(err))
Expand Down
2 changes: 1 addition & 1 deletion pkg/executor/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl
return err
}

err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.ExtraPartialRowOption)
err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.IndexesRowLayout)
if err != nil {
return err
}
Expand Down
1 change: 0 additions & 1 deletion pkg/executor/importer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ go_library(
"job.go",
"kv_encode.go",
"precheck.go",
"progress.go",
"table_import.go",
],
importpath = "github.com/pingcap/tidb/pkg/executor/importer",
Expand Down
5 changes: 1 addition & 4 deletions pkg/executor/importer/engine_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func ProcessChunk(
chunk *checkpoints.ChunkCheckpoint,
tableImporter *TableImporter,
dataEngine, indexEngine *backend.OpenedEngine,
progress *Progress,
logger *zap.Logger,
groupChecksum *verification.KVGroupChecksum,
) error {
Expand Down Expand Up @@ -65,7 +64,7 @@ func ProcessChunk(
}
}()

return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger, groupChecksum)
return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, logger, groupChecksum)
}

// ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter.
Expand All @@ -74,7 +73,6 @@ func ProcessChunkWithWriter(
chunk *checkpoints.ChunkCheckpoint,
tableImporter *TableImporter,
dataWriter, indexWriter backend.EngineWriter,
progress *Progress,
logger *zap.Logger,
groupChecksum *verification.KVGroupChecksum,
) error {
Expand Down Expand Up @@ -116,6 +114,5 @@ func ProcessChunkWithWriter(
if err != nil {
return err
}
progress.AddColSize(encoder.GetColumnSize())
return nil
}
5 changes: 2 additions & 3 deletions pkg/executor/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1395,9 +1395,8 @@ func getDataSourceType(p *plannercore.ImportInto) DataSourceType {

// JobImportResult is the result of the job import.
type JobImportResult struct {
Affected uint64
Warnings []contextutil.SQLWarn
ColSizeMap variable.DeltaColsMap
Affected uint64
Warnings []contextutil.SQLWarn
}

// GetMsgFromBRError get msg from BR error.
Expand Down
8 changes: 2 additions & 6 deletions pkg/executor/importer/importer_testkit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,11 +314,9 @@ func TestProcessChunkWith(t *testing.T) {
defer ti.Backend().CloseEngineMgr()
kvWriter := mock.NewMockEngineWriter(ctrl)
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
progress := importer.NewProgress()
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
require.NoError(t, err)
require.Len(t, progress.GetColSize(), 3)
checksumMap := checksum.GetInnerChecksums()
require.Len(t, checksumMap, 1)
require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupID])
Expand Down Expand Up @@ -346,11 +344,9 @@ func TestProcessChunkWith(t *testing.T) {
ti.SetSelectedRowCh(rowsCh)
kvWriter := mock.NewMockEngineWriter(ctrl)
kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
progress := importer.NewProgress()
checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum)
err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum)
require.NoError(t, err)
require.Len(t, progress.GetColSize(), 3)
checksumMap := checksum.GetInnerChecksums()
require.Len(t, checksumMap, 1)
require.Equal(t, verify.MakeKVChecksum(111, 3, 14585065391351463171), *checksumMap[verify.DataKVGroupID])
Expand Down
6 changes: 0 additions & 6 deletions pkg/executor/importer/kv_encode.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ import (
// KVEncoder encodes a row of data into a KV pair.
type KVEncoder interface {
Encode(row []types.Datum, rowID int64) (*kv.Pairs, error)
// GetColumnSize returns the size of each column in the current encoder.
GetColumnSize() map[int64]int64
io.Closer
}

Expand Down Expand Up @@ -91,10 +89,6 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err
return en.Record2KV(record, row, rowID)
}

func (en *tableKVEncoder) GetColumnSize() map[int64]int64 {
return en.SessionCtx.GetColumnSize(en.TableMeta().ID)
}

// todo merge with code in load_data.go
func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID int64) ([]types.Datum, error) {
row := make([]types.Datum, 0, len(en.insertColumns))
Expand Down
49 changes: 0 additions & 49 deletions pkg/executor/importer/progress.go

This file was deleted.

16 changes: 5 additions & 11 deletions pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,25 +654,20 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C
}

var (
mu sync.Mutex
checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
colSizeMap = make(map[int64]int64)
mu sync.Mutex
checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
)
eg, egCtx := tidbutil.NewErrorGroupWithRecoverWithCtx(ctx)
for i := 0; i < ti.ThreadCnt; i++ {
eg.Go(func() error {
chunkCheckpoint := checkpoints.ChunkCheckpoint{}
chunkChecksum := verify.NewKVGroupChecksumWithKeyspace(ti.keyspace)
progress := NewProgress()
defer func() {
mu.Lock()
defer mu.Unlock()
checksum.Add(chunkChecksum)
for k, v := range progress.GetColSize() {
colSizeMap[k] += v
}
}()
return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, progress, ti.logger, chunkChecksum)
return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, ti.logger, chunkChecksum)
})
}
if err = eg.Wait(); err != nil {
Expand Down Expand Up @@ -716,8 +711,7 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C
}

return &JobImportResult{
Affected: uint64(dataKVCount),
ColSizeMap: colSizeMap,
Affected: uint64(dataKVCount),
}, nil
}

Expand Down Expand Up @@ -976,7 +970,7 @@ func FlushTableStats(ctx context.Context, se sessionctx.Context, tableID int64,
sessionVars := se.GetSessionVars()
sessionVars.TxnCtxMu.Lock()
defer sessionVars.TxnCtxMu.Unlock()
sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected), result.ColSizeMap)
sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected))
se.StmtCommit(ctx)
return se.CommitTxn(ctx)
}
Loading

0 comments on commit f069f0b

Please sign in to comment.