diff --git a/pkg/disttask/importinto/proto.go b/pkg/disttask/importinto/proto.go index 48378f44d8649..e8c9d3f9794e4 100644 --- a/pkg/disttask/importinto/proto.go +++ b/pkg/disttask/importinto/proto.go @@ -110,7 +110,6 @@ type SharedVars struct { TableImporter *importer.TableImporter DataEngine *backend.OpenedEngine IndexEngine *backend.OpenedEngine - Progress *importer.Progress mu sync.Mutex Checksum *verification.KVGroupChecksum @@ -183,5 +182,4 @@ type Checksum struct { // This portion of the code may be implemented uniformly in the framework in the future. type Result struct { LoadedRowCnt uint64 - ColSizeMap map[int64]int64 } diff --git a/pkg/disttask/importinto/scheduler.go b/pkg/disttask/importinto/scheduler.go index 1dd1c0021ae7f..2567806b8063c 100644 --- a/pkg/disttask/importinto/scheduler.go +++ b/pkg/disttask/importinto/scheduler.go @@ -577,14 +577,9 @@ func updateResult(handle storage.TaskHandle, task *proto.Task, taskMeta *TaskMet } subtaskMetas = append(subtaskMetas, &subtaskMeta) } - columnSizeMap := make(map[int64]int64) for _, subtaskMeta := range subtaskMetas { taskMeta.Result.LoadedRowCnt += subtaskMeta.Result.LoadedRowCnt - for key, val := range subtaskMeta.Result.ColSizeMap { - columnSizeMap[key] += val - } } - taskMeta.Result.ColSizeMap = columnSizeMap if globalSort { taskMeta.Result.LoadedRowCnt, err = getLoadedRowCountOnGlobalSort(handle, task) @@ -662,8 +657,7 @@ func (sch *ImportSchedulerExt) finishJob(ctx context.Context, logger *zap.Logger func(ctx context.Context) (bool, error) { return true, taskHandle.WithNewSession(func(se sessionctx.Context) error { if err := importer.FlushTableStats(ctx, se, taskMeta.Plan.TableInfo.ID, &importer.JobImportResult{ - Affected: taskMeta.Result.LoadedRowCnt, - ColSizeMap: taskMeta.Result.ColSizeMap, + Affected: taskMeta.Result.LoadedRowCnt, }); err != nil { logger.Warn("flush table stats failed", zap.Error(err)) } diff --git a/pkg/disttask/importinto/subtask_executor.go b/pkg/disttask/importinto/subtask_executor.go index 4978a2c09aaa9..3dcb7347ca589 100644 --- a/pkg/disttask/importinto/subtask_executor.go +++ b/pkg/disttask/importinto/subtask_executor.go @@ -69,7 +69,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr sharedVars.TableImporter, sharedVars.DataEngine, sharedVars.IndexEngine, - sharedVars.Progress, logger, checksum, ); err != nil { @@ -82,7 +81,6 @@ func (e *importMinimalTaskExecutor) Run(ctx context.Context, dataWriter, indexWr sharedVars.TableImporter, dataWriter, indexWriter, - sharedVars.Progress, logger, checksum, ); err != nil { diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index 71247f0cc549e..e78eba90584cb 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -154,7 +154,6 @@ func (s *importStepExecutor) RunSubtask(ctx context.Context, subtask *proto.Subt TableImporter: s.tableImporter, DataEngine: dataEngine, IndexEngine: indexEngine, - Progress: importer.NewProgress(), Checksum: verification.NewKVGroupChecksumWithKeyspace(s.tableImporter.GetKeySpace()), SortedDataMeta: &external.SortedKVMeta{}, SortedIndexMetas: make(map[int64]*external.SortedKVMeta), @@ -251,7 +250,6 @@ func (s *importStepExecutor) onFinished(ctx context.Context, subtask *proto.Subt } subtaskMeta.Result = Result{ LoadedRowCnt: dataKVCount, - ColSizeMap: sharedVars.Progress.GetColSize(), } allocators := sharedVars.TableImporter.Allocators() subtaskMeta.MaxIDs = map[autoid.AllocatorType]int64{ diff --git a/pkg/executor/delete.go b/pkg/executor/delete.go index ea6d6861702e5..04c6a8efa2c51 100644 --- a/pkg/executor/delete.go +++ b/pkg/executor/delete.go @@ -274,7 +274,7 @@ func (e *DeleteExec) removeRow(ctx sessionctx.Context, t table.Table, h kv.Handl return err } - err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.ExtraPartialRowOption) + err = t.RemoveRecord(ctx.GetTableCtx(), txn, h, data, posInfo.IndexesRowLayout) if err != nil { return err } diff --git a/pkg/executor/importer/BUILD.bazel b/pkg/executor/importer/BUILD.bazel index be7191b77e60a..61e9e8fada8b4 100644 --- a/pkg/executor/importer/BUILD.bazel +++ b/pkg/executor/importer/BUILD.bazel @@ -9,7 +9,6 @@ go_library( "job.go", "kv_encode.go", "precheck.go", - "progress.go", "table_import.go", ], importpath = "github.com/pingcap/tidb/pkg/executor/importer", diff --git a/pkg/executor/importer/engine_process.go b/pkg/executor/importer/engine_process.go index 93e6ca802cfeb..8d00be701c367 100644 --- a/pkg/executor/importer/engine_process.go +++ b/pkg/executor/importer/engine_process.go @@ -30,7 +30,6 @@ func ProcessChunk( chunk *checkpoints.ChunkCheckpoint, tableImporter *TableImporter, dataEngine, indexEngine *backend.OpenedEngine, - progress *Progress, logger *zap.Logger, groupChecksum *verification.KVGroupChecksum, ) error { @@ -65,7 +64,7 @@ func ProcessChunk( } }() - return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, progress, logger, groupChecksum) + return ProcessChunkWithWriter(ctx, chunk, tableImporter, dataWriter, indexWriter, logger, groupChecksum) } // ProcessChunkWithWriter processes a chunk, and write kv pairs to dataWriter and indexWriter. @@ -74,7 +73,6 @@ func ProcessChunkWithWriter( chunk *checkpoints.ChunkCheckpoint, tableImporter *TableImporter, dataWriter, indexWriter backend.EngineWriter, - progress *Progress, logger *zap.Logger, groupChecksum *verification.KVGroupChecksum, ) error { @@ -116,6 +114,5 @@ func ProcessChunkWithWriter( if err != nil { return err } - progress.AddColSize(encoder.GetColumnSize()) return nil } diff --git a/pkg/executor/importer/import.go b/pkg/executor/importer/import.go index 6ef74ba91f0a3..b36598715309b 100644 --- a/pkg/executor/importer/import.go +++ b/pkg/executor/importer/import.go @@ -1395,9 +1395,8 @@ func getDataSourceType(p *plannercore.ImportInto) DataSourceType { // JobImportResult is the result of the job import. type JobImportResult struct { - Affected uint64 - Warnings []contextutil.SQLWarn - ColSizeMap variable.DeltaColsMap + Affected uint64 + Warnings []contextutil.SQLWarn } // GetMsgFromBRError get msg from BR error. diff --git a/pkg/executor/importer/importer_testkit_test.go b/pkg/executor/importer/importer_testkit_test.go index fe99739c27b3b..01b00acced6da 100644 --- a/pkg/executor/importer/importer_testkit_test.go +++ b/pkg/executor/importer/importer_testkit_test.go @@ -311,11 +311,9 @@ func TestProcessChunkWith(t *testing.T) { defer ti.Backend().CloseEngineMgr() kvWriter := mock.NewMockEngineWriter(ctrl) kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - progress := importer.NewProgress() checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace) - err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum) + err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum) require.NoError(t, err) - require.Len(t, progress.GetColSize(), 3) checksumMap := checksum.GetInnerChecksums() require.Len(t, checksumMap, 1) require.Equal(t, verify.MakeKVChecksum(74, 2, 15625182175392723123), *checksumMap[verify.DataKVGroupID]) @@ -343,11 +341,9 @@ func TestProcessChunkWith(t *testing.T) { ti.SetSelectedRowCh(rowsCh) kvWriter := mock.NewMockEngineWriter(ctrl) kvWriter.EXPECT().AppendRows(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() - progress := importer.NewProgress() checksum := verify.NewKVGroupChecksumWithKeyspace(keyspace) - err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, progress, zap.NewExample(), checksum) + err := importer.ProcessChunkWithWriter(ctx, chunkInfo, ti, kvWriter, kvWriter, zap.NewExample(), checksum) require.NoError(t, err) - require.Len(t, progress.GetColSize(), 3) checksumMap := checksum.GetInnerChecksums() require.Len(t, checksumMap, 1) require.Equal(t, verify.MakeKVChecksum(111, 3, 14585065391351463171), *checksumMap[verify.DataKVGroupID]) diff --git a/pkg/executor/importer/kv_encode.go b/pkg/executor/importer/kv_encode.go index 88d43c33409f8..a1c908ad0d4bf 100644 --- a/pkg/executor/importer/kv_encode.go +++ b/pkg/executor/importer/kv_encode.go @@ -35,8 +35,6 @@ import ( // KVEncoder encodes a row of data into a KV pair. type KVEncoder interface { Encode(row []types.Datum, rowID int64) (*kv.Pairs, error) - // GetColumnSize returns the size of each column in the current encoder. - GetColumnSize() map[int64]int64 io.Closer } @@ -91,10 +89,6 @@ func (en *tableKVEncoder) Encode(row []types.Datum, rowID int64) (*kv.Pairs, err return en.Record2KV(record, row, rowID) } -func (en *tableKVEncoder) GetColumnSize() map[int64]int64 { - return en.SessionCtx.GetColumnSize(en.TableMeta().ID) -} - // todo merge with code in load_data.go func (en *tableKVEncoder) parserData2TableData(parserData []types.Datum, rowID int64) ([]types.Datum, error) { row := make([]types.Datum, 0, len(en.insertColumns)) diff --git a/pkg/executor/importer/progress.go b/pkg/executor/importer/progress.go deleted file mode 100644 index 8ee0e882ef925..0000000000000 --- a/pkg/executor/importer/progress.go +++ /dev/null @@ -1,49 +0,0 @@ -// Copyright 2023 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package importer - -import ( - "maps" - "sync" -) - -// Progress is the progress of the IMPORT INTO task. -type Progress struct { - colSizeMu sync.Mutex - colSizeMap map[int64]int64 -} - -// NewProgress creates a new Progress. -func NewProgress() *Progress { - return &Progress{ - colSizeMap: make(map[int64]int64), - } -} - -// AddColSize adds the size of the column to the progress. -func (p *Progress) AddColSize(colSizeMap map[int64]int64) { - p.colSizeMu.Lock() - defer p.colSizeMu.Unlock() - for key, value := range colSizeMap { - p.colSizeMap[key] += value - } -} - -// GetColSize returns the size of the column. -func (p *Progress) GetColSize() map[int64]int64 { - p.colSizeMu.Lock() - defer p.colSizeMu.Unlock() - return maps.Clone(p.colSizeMap) -} diff --git a/pkg/executor/importer/table_import.go b/pkg/executor/importer/table_import.go index db4dd815f03c0..6d20d0df1c33c 100644 --- a/pkg/executor/importer/table_import.go +++ b/pkg/executor/importer/table_import.go @@ -655,25 +655,20 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C } var ( - mu sync.Mutex - checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace) - colSizeMap = make(map[int64]int64) + mu sync.Mutex + checksum = verify.NewKVGroupChecksumWithKeyspace(ti.keyspace) ) eg, egCtx := tidbutil.NewErrorGroupWithRecoverWithCtx(ctx) for i := 0; i < ti.ThreadCnt; i++ { eg.Go(func() error { chunkCheckpoint := checkpoints.ChunkCheckpoint{} chunkChecksum := verify.NewKVGroupChecksumWithKeyspace(ti.keyspace) - progress := NewProgress() defer func() { mu.Lock() defer mu.Unlock() checksum.Add(chunkChecksum) - for k, v := range progress.GetColSize() { - colSizeMap[k] += v - } }() - return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, progress, ti.logger, chunkChecksum) + return ProcessChunk(egCtx, &chunkCheckpoint, ti, dataEngine, indexEngine, ti.logger, chunkChecksum) }) } if err = eg.Wait(); err != nil { @@ -717,8 +712,7 @@ func (ti *TableImporter) ImportSelectedRows(ctx context.Context, se sessionctx.C } return &JobImportResult{ - Affected: uint64(dataKVCount), - ColSizeMap: colSizeMap, + Affected: uint64(dataKVCount), }, nil } @@ -977,7 +971,7 @@ func FlushTableStats(ctx context.Context, se sessionctx.Context, tableID int64, sessionVars := se.GetSessionVars() sessionVars.TxnCtxMu.Lock() defer sessionVars.TxnCtxMu.Unlock() - sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected), result.ColSizeMap) + sessionVars.TxnCtx.UpdateDeltaForTable(tableID, int64(result.Affected), int64(result.Affected)) se.StmtCommit(ctx) return se.CommitTxn(ctx) } diff --git a/pkg/executor/infoschema_reader_test.go b/pkg/executor/infoschema_reader_test.go index 5458a97d4a25c..f016696520c84 100644 --- a/pkg/executor/infoschema_reader_test.go +++ b/pkg/executor/infoschema_reader_test.go @@ -184,20 +184,23 @@ func TestDataForTableStatsField(t *testing.T) { require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( - testkit.Rows("3 18 54 6")) + testkit.Rows("3 16 48 0")) tk.MustExec(`insert into t(c, d, e) values(4, 5, "f")`) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( - testkit.Rows("4 18 72 8")) + testkit.Rows("4 16 64 0")) tk.MustExec("delete from t where c >= 3") require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( - testkit.Rows("2 18 36 4")) + testkit.Rows("2 16 32 0")) tk.MustExec("delete from t where c=3") require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) + tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( + testkit.Rows("2 16 32 0")) + tk.MustExec("analyze table t all columns") tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( testkit.Rows("2 18 36 4")) @@ -209,6 +212,9 @@ func TestDataForTableStatsField(t *testing.T) { tk.MustExec(`insert into t(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e")`) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) + tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( + testkit.Rows("3 16 48 0")) + tk.MustExec("analyze table t all columns") tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.tables where table_name='t'").Check( testkit.Rows("3 18 54 6")) } @@ -229,23 +235,24 @@ func TestPartitionsTable(t *testing.T) { tk.MustExec(`insert into test_partitions(a, b, c) values(1, 2, "c"), (7, 3, "d"), (12, 4, "e");`) tk.MustQuery("select PARTITION_NAME, PARTITION_DESCRIPTION from information_schema.PARTITIONS where table_name='test_partitions';").Check( - testkit.Rows("" + - "p0 6]\n" + - "[p1 11]\n" + - "[p2 16")) + testkit.Rows("p0 6", "p1 11", "p2 16")) tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.PARTITIONS where table_name='test_partitions';").Check( - testkit.Rows("" + - "0 0 0 0]\n" + - "[0 0 0 0]\n" + - "[0 0 0 0")) + testkit.Rows( + "0 0 0 0", + "0 0 0 0", + "0 0 0 0", + ), + ) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) tk.MustQuery("select table_rows, avg_row_length, data_length, index_length from information_schema.PARTITIONS where table_name='test_partitions';").Check( - testkit.Rows("" + - "1 18 18 2]\n" + - "[1 18 18 2]\n" + - "[1 18 18 2")) + testkit.Rows( + "1 16 16 0", + "1 16 16 0", + "1 16 16 0", + ), + ) }) // Test for table has no partitions. @@ -257,7 +264,7 @@ func TestPartitionsTable(t *testing.T) { require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), is)) tk.MustQuery("select PARTITION_NAME, TABLE_ROWS, AVG_ROW_LENGTH, DATA_LENGTH, INDEX_LENGTH from information_schema.PARTITIONS where table_name='test_partitions_1';").Check( - testkit.Rows(" 3 18 54 6")) + testkit.Rows(" 3 16 48 0")) tk.MustExec("DROP TABLE IF EXISTS `test_partitions`;") tk.MustExec(`CREATE TABLE test_partitions1 (id int, b int, c varchar(5), primary key(id), index idx(c)) PARTITION BY RANGE COLUMNS(id) (PARTITION p0 VALUES LESS THAN (6), PARTITION p1 VALUES LESS THAN (11), PARTITION p2 VALUES LESS THAN (16));`) diff --git a/pkg/executor/internal/exec/executor.go b/pkg/executor/internal/exec/executor.go index 3b3673dc1edee..71187f57fdec5 100644 --- a/pkg/executor/internal/exec/executor.go +++ b/pkg/executor/internal/exec/executor.go @@ -378,7 +378,7 @@ func (e *BaseExecutor) Ctx() sessionctx.Context { // UpdateDeltaForTableID updates the delta info for the table with tableID. func (e *BaseExecutor) UpdateDeltaForTableID(id int64) { txnCtx := e.ctx.GetSessionVars().TxnCtx - txnCtx.UpdateDeltaForTable(id, 0, 0, nil) + txnCtx.UpdateDeltaForTable(id, 0, 0) } // GetSysSession gets a system session context from executor. diff --git a/pkg/executor/test/analyzetest/analyze_test.go b/pkg/executor/test/analyzetest/analyze_test.go index 669720ff30d48..14cc0793af312 100644 --- a/pkg/executor/test/analyzetest/analyze_test.go +++ b/pkg/executor/test/analyzetest/analyze_test.go @@ -1146,6 +1146,7 @@ func TestAnalyzeColumnsWithPrimaryKey(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("create table t (a int, b int, c int primary key)") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t values (1,1,1), (1,1,2), (2,2,3), (2,2,4), (3,3,5), (4,3,6), (5,4,7), (6,4,8), (null,null,9)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1189,7 +1190,7 @@ func TestAnalyzeColumnsWithPrimaryKey(t *testing.T) { "test t c 0 2 1")) tk.MustQuery(fmt.Sprintf("select is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms where table_id = %d", tblID)).Sort().Check( testkit.Rows("0 1 6 1 8 2 1", - "0 2 0 0 8 0 0", // column b is not analyzed + "0 2 0 0 0 0 0", // column b is not analyzed "0 3 9 0 9 2 1", )) tk.MustQuery("show stats_buckets where db_name = 'test' and table_name = 't'").Sort().Check( @@ -1213,6 +1214,7 @@ func TestAnalyzeColumnsWithIndex(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("create table t (a int, b int, c int, d int, index idx_b_d(b, d))") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t values (1,1,null,1), (2,1,9,1), (1,1,8,1), (2,2,7,2), (1,3,7,3), (2,4,6,4), (1,4,6,5), (2,4,6,5), (1,5,6,5)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1260,7 +1262,7 @@ func TestAnalyzeColumnsWithIndex(t *testing.T) { "test t idx_b_d 1 (1, 1) 3", "test t idx_b_d 1 (4, 5) 2")) tk.MustQuery(fmt.Sprintf("select is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms where table_id = %d", tblID)).Sort().Check( - testkit.Rows("0 1 0 0 9 0 0", // column a is not analyzed + testkit.Rows("0 1 0 0 0 0 0", // column a is not analyzed "0 2 5 0 9 2 1", "0 3 4 1 8 2 -0.07", "0 4 5 0 9 2 1", @@ -1289,6 +1291,7 @@ func TestAnalyzeColumnsWithClusteredIndex(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("create table t (a int, b int, c int, d int, primary key(b, d) clustered)") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t values (1,1,null,1), (2,2,9,2), (1,3,8,3), (2,4,7,4), (1,5,7,5), (2,6,6,6), (1,7,6,7), (2,8,6,8), (1,9,6,9)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1336,7 +1339,7 @@ func TestAnalyzeColumnsWithClusteredIndex(t *testing.T) { "test t d 0 1 1", "test t d 0 2 1")) tk.MustQuery(fmt.Sprintf("select is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms where table_id = %d", tblID)).Sort().Check( - testkit.Rows("0 1 0 0 9 0 0", // column a is not analyzed + testkit.Rows("0 1 0 0 0 0 0", // column a is not analyzed "0 2 9 0 9 2 1", "0 3 4 1 8 2 -0.07", "0 4 9 0 9 2 1", @@ -1366,6 +1369,7 @@ func TestAnalyzeColumnsWithDynamicPartitionTable(t *testing.T) { tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("set @@tidb_partition_prune_mode = 'dynamic'") tk.MustExec("create table t (a int, b int, c int, index idx(c)) partition by range (a) (partition p0 values less than (10), partition p1 values less than maxvalue)") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t values (1,2,1), (2,4,1), (3,6,1), (4,8,2), (4,8,2), (5,10,3), (5,10,4), (5,10,5), (null,null,6), (11,22,7), (12,24,8), (13,26,9), (14,28,10), (15,30,11), (16,32,12), (16,32,13), (16,32,13), (16,32,14), (17,34,14), (17,34,14)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1462,15 +1466,16 @@ func TestAnalyzeColumnsWithDynamicPartitionTable(t *testing.T) { "test t p1 idx 1 1 6 1 11 12 0")) tk.MustQuery("select table_id, is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms order by table_id, is_index, hist_id asc").Check( - testkit.Rows(fmt.Sprintf("%d 0 1 12 1 19 2 0", tblID), // global, a + testkit.Rows(fmt.Sprintf("%d 0 1 12 1 19 2 0", tblID), // global, aA + fmt.Sprintf("%d 0 2 0 0 0 0 0", tblID), // global, b, not analyzed fmt.Sprintf("%d 0 3 14 0 20 2 0", tblID), // global, c fmt.Sprintf("%d 1 1 14 0 0 2 0", tblID), // global, idx fmt.Sprintf("%d 0 1 5 1 8 2 1", p0ID), // p0, a - fmt.Sprintf("%d 0 2 0 0 8 0 0", p0ID), // p0, b, not analyzed + fmt.Sprintf("%d 0 2 0 0 0 0 0", p0ID), // p0, b, not analyzed fmt.Sprintf("%d 0 3 6 0 9 2 1", p0ID), // p0, c fmt.Sprintf("%d 1 1 6 0 9 2 0", p0ID), // p0, idx fmt.Sprintf("%d 0 1 7 0 11 2 1", p1ID), // p1, a - fmt.Sprintf("%d 0 2 0 0 11 0 0", p1ID), // p1, b, not analyzed + fmt.Sprintf("%d 0 2 0 0 0 0 0", p1ID), // p1, b, not analyzed fmt.Sprintf("%d 0 3 8 0 11 2 1", p1ID), // p1, c fmt.Sprintf("%d 1 1 8 0 11 2 0", p1ID), // p1, idx )) @@ -1490,12 +1495,14 @@ func TestAnalyzeColumnsWithStaticPartitionTable(t *testing.T) { tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("set @@tidb_partition_prune_mode = 'static'") tk.MustExec("create table t (a int, b int, c int, index idx(c)) partition by range (a) (partition p0 values less than (10), partition p1 values less than maxvalue)") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t values (1,2,1), (2,4,1), (3,6,1), (4,8,2), (4,8,2), (5,10,3), (5,10,4), (5,10,5), (null,null,6), (11,22,7), (12,24,8), (13,26,9), (14,28,10), (15,30,11), (16,32,12), (16,32,13), (16,32,13), (16,32,14), (17,34,14), (17,34,14)") require.NoError(t, h.DumpStatsDeltaToKV(true)) is := dom.InfoSchema() tbl, err := is.TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) + tblID := tbl.Meta().ID defs := tbl.Meta().Partition.Definitions p0ID := defs[0].ID p1ID := defs[1].ID @@ -1570,12 +1577,16 @@ func TestAnalyzeColumnsWithStaticPartitionTable(t *testing.T) { "test t p1 idx 1 1 6 1 11 12 0")) tk.MustQuery("select table_id, is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms order by table_id, is_index, hist_id asc").Check( - testkit.Rows(fmt.Sprintf("%d 0 1 5 1 8 2 1", p0ID), // p0, a - fmt.Sprintf("%d 0 2 0 0 8 0 0", p0ID), // p0, b, not analyzed + testkit.Rows(fmt.Sprintf("%d 0 1 0 0 0 0 0", tblID), // global, a, not analyzed + fmt.Sprintf("%d 0 2 0 0 0 0 0", tblID), // global, b, not analyzed + fmt.Sprintf("%d 0 3 0 0 0 0 0", tblID), // global, c, not analyzed + fmt.Sprintf("%d 1 1 0 0 0 0 0", tblID), // global, idx, not analyzed + fmt.Sprintf("%d 0 1 5 1 8 2 1", p0ID), // p0, a + fmt.Sprintf("%d 0 2 0 0 0 0 0", p0ID), // p0, b, not analyzed fmt.Sprintf("%d 0 3 6 0 9 2 1", p0ID), // p0, c fmt.Sprintf("%d 1 1 6 0 9 2 0", p0ID), // p0, idx fmt.Sprintf("%d 0 1 7 0 11 2 1", p1ID), // p1, a - fmt.Sprintf("%d 0 2 0 0 11 0 0", p1ID), // p1, b, not analyzed + fmt.Sprintf("%d 0 2 0 0 0 0 0", p1ID), // p1, b, not analyzed fmt.Sprintf("%d 0 3 8 0 11 2 1", p1ID), // p1, c fmt.Sprintf("%d 1 1 8 0 11 2 0", p1ID), // p1, idx )) @@ -1595,6 +1606,7 @@ func TestAnalyzeColumnsWithExtendedStats(t *testing.T) { tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("set @@tidb_enable_extended_stats = on") tk.MustExec("create table t (a int, b int, c int)") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("alter table t add stats_extended s1 correlation(b,c)") tk.MustExec("insert into t values (5,1,1), (4,2,2), (3,3,3), (2,4,4), (1,5,5)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1637,7 +1649,7 @@ func TestAnalyzeColumnsWithExtendedStats(t *testing.T) { "test t c 0 1 1", "test t c 0 2 1")) tk.MustQuery(fmt.Sprintf("select is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms where table_id = %d", tblID)).Sort().Check( - testkit.Rows("0 1 0 0 5 0 0", // column a is not analyzed + testkit.Rows("0 1 0 0 0 0 0", // column a is not analyzed "0 2 5 0 5 2 1", "0 3 5 0 5 2 1", )) @@ -1665,6 +1677,7 @@ func TestAnalyzeColumnsWithVirtualColumnIndex(t *testing.T) { tk.MustExec("drop table if exists t") tk.MustExec("set @@tidb_analyze_version = 2") tk.MustExec("create table t (a int, b int, c int as (b+1), index idx(c))") + statstestutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t (a,b) values (1,1), (2,2), (3,3), (4,4), (5,4), (6,5), (7,5), (8,5), (null,null)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -1781,7 +1794,7 @@ func TestAnalyzeColumnsAfterAnalyzeAll(t *testing.T) { "test t b 0 1 3", "test t b 0 2 2")) tk.MustQuery(fmt.Sprintf("select is_index, hist_id, distinct_count, null_count, tot_col_size, stats_ver, truncate(correlation,2) from mysql.stats_histograms where table_id = %d", tblID)).Sort().Check( - testkit.Rows("0 1 4 0 8 2 1", // tot_col_size of column a is updated to 8 by DumpStatsDeltaToKV + testkit.Rows("0 1 4 0 6 2 1", "0 2 5 0 8 2 0.76")) tk.MustQuery("show stats_buckets where db_name = 'test' and table_name = 't'").Sort().Check( // db, tbl, part, col, is_index, bucket_id, count, repeats, lower, upper, ndv diff --git a/pkg/executor/test/analyzetest/memorycontrol/BUILD.bazel b/pkg/executor/test/analyzetest/memorycontrol/BUILD.bazel index 75d48f9dd113b..83de820219af5 100644 --- a/pkg/executor/test/analyzetest/memorycontrol/BUILD.bazel +++ b/pkg/executor/test/analyzetest/memorycontrol/BUILD.bazel @@ -15,6 +15,7 @@ go_test( "//pkg/executor", "//pkg/sessionctx/variable", "//pkg/statistics", + "//pkg/statistics/handle/ddl/testutil", "//pkg/testkit", "//pkg/util", "@com_github_pingcap_failpoint//:failpoint", diff --git a/pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go b/pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go index 1982e8a974871..3edb4c520a8a9 100644 --- a/pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go +++ b/pkg/executor/test/analyzetest/memorycontrol/memory_control_test.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/executor" "github.com/pingcap/tidb/pkg/statistics" + "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util" "github.com/stretchr/testify/require" @@ -134,6 +135,8 @@ func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t(a int)") + h := dom.StatsHandle() + testutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t select 1") for i := 1; i <= 8; i++ { tk.MustExec("insert into t select * from t") // 256 Lines @@ -143,7 +146,6 @@ func TestGlobalMemoryControlForAutoAnalyze(t *testing.T) { rs0 := tk.MustQuery("select fail_reason from mysql.analyze_jobs where table_name=? and state=? limit 1", "t", "failed") require.Len(t, rs0.Rows(), 0) - h := dom.StatsHandle() originalVal4 := statistics.AutoAnalyzeMinCnt originalVal5 := tk.MustQuery("select @@global.tidb_auto_analyze_ratio").Rows()[0][0].(string) statistics.AutoAnalyzeMinCnt = 0 diff --git a/pkg/lightning/backend/kv/context.go b/pkg/lightning/backend/kv/context.go index 14c1963723c21..6af19687eef53 100644 --- a/pkg/lightning/backend/kv/context.go +++ b/pkg/lightning/backend/kv/context.go @@ -15,9 +15,7 @@ package kv import ( - "maps" "math/rand" - "sync" "time" "github.com/pingcap/tidb/pkg/errctx" @@ -122,11 +120,6 @@ type litTableMutateContext struct { reservedRowIDAlloc stmtctx.ReservedRowIDAlloc enableMutationChecker bool assertionLevel variable.AssertionLevel - tableDelta struct { - sync.Mutex - // tblID -> (colID -> deltaSize) - m map[int64]map[int64]int64 - } } // AlternativeAllocators implements the `table.MutateContext` interface. @@ -188,25 +181,9 @@ func (ctx *litTableMutateContext) GetStatisticsSupport() (tblctx.StatisticsSuppo } // UpdatePhysicalTableDelta implements the `table.StatisticsSupport` interface. -func (ctx *litTableMutateContext) UpdatePhysicalTableDelta( - physicalTableID int64, _ int64, - _ int64, cols variable.DeltaCols, +func (*litTableMutateContext) UpdatePhysicalTableDelta( + _, _, _ int64, ) { - ctx.tableDelta.Lock() - defer ctx.tableDelta.Unlock() - if ctx.tableDelta.m == nil { - ctx.tableDelta.m = make(map[int64]map[int64]int64) - } - tableMap := ctx.tableDelta.m - colSize := tableMap[physicalTableID] - tableMap[physicalTableID] = cols.UpdateColSizeMap(colSize) -} - -// GetColumnSize returns the colum size map (colID -> deltaSize) for the given table ID. -func (ctx *litTableMutateContext) GetColumnSize(tblID int64) (ret map[int64]int64) { - ctx.tableDelta.Lock() - defer ctx.tableDelta.Unlock() - return maps.Clone(ctx.tableDelta.m[tblID]) } // GetCachedTableSupport implements the `table.MutateContext` interface. diff --git a/pkg/lightning/backend/kv/context_test.go b/pkg/lightning/backend/kv/context_test.go index bb04a4f3d5bbe..fd6d3b5d83406 100644 --- a/pkg/lightning/backend/kv/context_test.go +++ b/pkg/lightning/backend/kv/context_test.go @@ -250,17 +250,8 @@ func TestLitTableMutateContext(t *testing.T) { stats, ok := tblCtx.GetStatisticsSupport() require.True(t, ok) // test for `UpdatePhysicalTableDelta` and `GetColumnSize` - stats.UpdatePhysicalTableDelta(123, 5, 2, variable.DeltaColsMap{1: 2, 3: 4}) - r := tblCtx.GetColumnSize(123) - require.Equal(t, map[int64]int64{1: 2, 3: 4}, r) - stats.UpdatePhysicalTableDelta(123, 8, 2, variable.DeltaColsMap{3: 5, 4: 3}) - r = tblCtx.GetColumnSize(123) - require.Equal(t, map[int64]int64{1: 2, 3: 9, 4: 3}, r) - // the result should be a cloned value - r[1] = 100 - require.Equal(t, map[int64]int64{1: 2, 3: 9, 4: 3}, tblCtx.GetColumnSize(123)) - // test gets a non-existed table - require.Empty(t, tblCtx.GetColumnSize(456)) + stats.UpdatePhysicalTableDelta(123, 5, 2) + stats.UpdatePhysicalTableDelta(123, 8, 2) } // test for default diff --git a/pkg/lightning/backend/kv/session.go b/pkg/lightning/backend/kv/session.go index bda638da38609..b2cd3d540c7bf 100644 --- a/pkg/lightning/backend/kv/session.go +++ b/pkg/lightning/backend/kv/session.go @@ -363,11 +363,6 @@ func (s *Session) UnsetUserVar(varName string) { s.exprCtx.unsetUserVar(varName) } -// GetColumnSize returns the size of each column. -func (s *Session) GetColumnSize(tblID int64) (ret map[int64]int64) { - return s.tblCtx.GetColumnSize(tblID) -} - // Close closes the session func (s *Session) Close() { memBuf := &s.txn.MemBuf diff --git a/pkg/planner/core/integration_test.go b/pkg/planner/core/integration_test.go index 38837cc33b38e..c653a4e67e6b8 100644 --- a/pkg/planner/core/integration_test.go +++ b/pkg/planner/core/integration_test.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util" @@ -2165,6 +2166,7 @@ func TestIssue48257(t *testing.T) { // 1. test sync load tk.MustExec("create table t(a int)") + testutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t value(1)") require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) @@ -2190,6 +2192,7 @@ func TestIssue48257(t *testing.T) { // 2. test async load tk.MustExec("set tidb_stats_load_sync_wait = 0") tk.MustExec("create table t1(a int)") + testutil.HandleNextDDLEventWithTxn(h) tk.MustExec("insert into t1 value(1)") require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) diff --git a/pkg/planner/core/logical_plan_builder.go b/pkg/planner/core/logical_plan_builder.go index 0f6651e5baeab..074bfd2734280 100644 --- a/pkg/planner/core/logical_plan_builder.go +++ b/pkg/planner/core/logical_plan_builder.go @@ -5205,7 +5205,9 @@ type TblColPosInfo struct { // HandleOrdinal represents the ordinal of the handle column. HandleCols util.HandleCols - *table.ExtraPartialRowOption + // IndexesRowLayout store the row layout of indexes. We need it if column pruning happens. + // If it's nil, means no column pruning happens. + IndexesRowLayout table.IndexesLayout } // MemoryUsage return the memory usage of TblColPosInfo @@ -5349,10 +5351,9 @@ func initColPosInfo(tid int64, names []*types.FieldName, handleCol util.HandleCo return TblColPosInfo{}, err } return TblColPosInfo{ - TblID: tid, - Start: offset, - HandleCols: handleCol, - ExtraPartialRowOption: &table.ExtraPartialRowOption{}, + TblID: tid, + Start: offset, + HandleCols: handleCol, }, nil } @@ -5380,7 +5381,6 @@ func pruneAndBuildSingleTableColPosInfoForDelete( // Columns can be seen by DELETE are the deletable columns. deletableCols := t.DeletableCols() deletableIdxs := t.DeletableIndices() - publicCols := t.Cols() tblLen := len(deletableCols) // Fix the start position of the columns. @@ -5415,25 +5415,6 @@ func pruneAndBuildSingleTableColPosInfoForDelete( fixedPos[i] = i - pruned } - // Fill in the ColumnSizes. - colPosInfo.ColumnsSizeHelper = &table.ColumnsSizeHelper{ - NotPruned: bitset.New(uint(len(publicCols))), - AvgSizes: make([]float64, 0, len(publicCols)), - PublicColsLayout: make([]int, 0, len(publicCols)), - } - colPosInfo.ColumnsSizeHelper.NotPruned.SetAll() - for i, col := range publicCols { - // If the column is not pruned, we can use the column data to get a more accurate size. - // We just need to record its position info. - if _, ok := fixedPos[col.Offset]; ok { - colPosInfo.ColumnsSizeHelper.PublicColsLayout = append(colPosInfo.ColumnsSizeHelper.PublicColsLayout, fixedPos[col.Offset]) - continue - } - // Otherwise we need to get the average size of the column by its field type. - // TODO: use statistics to get a maybe more accurate size. - colPosInfo.ColumnsSizeHelper.NotPruned.Clear(uint(i)) - colPosInfo.ColumnsSizeHelper.AvgSizes = append(colPosInfo.ColumnsSizeHelper.AvgSizes, float64(chunk.EstimateTypeWidth(&col.FieldType))) - } // Fix the index layout and fill in table.IndexRowLayoutOption. indexColMap := make(map[int64]table.IndexRowLayoutOption, len(deletableIdxs)) for _, idx := range deletableIdxs { diff --git a/pkg/planner/core/logical_plans_test.go b/pkg/planner/core/logical_plans_test.go index b10abfd47fd49..dffa2ca8230dc 100644 --- a/pkg/planner/core/logical_plans_test.go +++ b/pkg/planner/core/logical_plans_test.go @@ -2538,7 +2538,7 @@ func TestPruneColumnsForDelete(t *testing.T) { ret := make([][]string, 0, len(deletePlan.TblColPosInfos)) for _, colsLayout := range deletePlan.TblColPosInfos { innerRet := make([]string, 0, len(colsLayout.IndexesRowLayout)*2+2) - if colsLayout.ExtraPartialRowOption.IndexesRowLayout == nil { + if colsLayout.IndexesRowLayout == nil { sb.Reset() fmt.Fprintf(&sb, "no column-pruning happened") innerRet = append(innerRet, sb.String()) diff --git a/pkg/planner/indexadvisor/optimizer_test.go b/pkg/planner/indexadvisor/optimizer_test.go index 69af075d041ef..468cc1865611a 100644 --- a/pkg/planner/indexadvisor/optimizer_test.go +++ b/pkg/planner/indexadvisor/optimizer_test.go @@ -193,6 +193,7 @@ func TestOptimizerEstIndexSize(t *testing.T) { tk.MustExec(`insert into t values (1, space(32))`) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) + tk.MustExec("analyze table t all columns") s, err = opt.EstIndexSize("test", "t", "a") require.NoError(t, err) require.Equal(t, float64(1), s) @@ -208,6 +209,7 @@ func TestOptimizerEstIndexSize(t *testing.T) { tk.MustExec(`insert into t values (1, space(64))`) require.NoError(t, h.DumpStatsDeltaToKV(true)) require.NoError(t, h.Update(context.Background(), dom.InfoSchema())) + tk.MustExec("analyze table t all columns") s, err = opt.EstIndexSize("test", "t", "a") require.NoError(t, err) require.Equal(t, float64(2), s) // 2 rows diff --git a/pkg/server/handler/tests/BUILD.bazel b/pkg/server/handler/tests/BUILD.bazel index 36af17e644c4b..b09d490f19320 100644 --- a/pkg/server/handler/tests/BUILD.bazel +++ b/pkg/server/handler/tests/BUILD.bazel @@ -36,6 +36,7 @@ go_test( "//pkg/sessionctx", "//pkg/sessionctx/stmtctx", "//pkg/sessionctx/variable", + "//pkg/statistics/handle/ddl/testutil", "//pkg/store/helper", "//pkg/store/mockstore", "//pkg/store/mockstore/unistore", diff --git a/pkg/server/handler/tests/http_handler_serial_test.go b/pkg/server/handler/tests/http_handler_serial_test.go index f9e9a6b3e30b8..8fccfa8e4f527 100644 --- a/pkg/server/handler/tests/http_handler_serial_test.go +++ b/pkg/server/handler/tests/http_handler_serial_test.go @@ -42,6 +42,7 @@ import ( "github.com/pingcap/tidb/pkg/server/handler/tikvhandler" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/deadlockhistory" "github.com/pingcap/tidb/pkg/util/versioninfo" @@ -592,6 +593,7 @@ func TestGetSchemaStorage(t *testing.T) { tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("create table t (c int, d int, e char(5), index idx(e))") + testutil.HandleNextDDLEventWithTxn(h) tk.MustExec(`insert into t(c, d, e) values(1, 2, "c"), (2, 3, "d"), (3, 4, "e")`) h.FlushStats() @@ -611,7 +613,7 @@ func TestGetSchemaStorage(t *testing.T) { sort.Strings(names) require.Equal(t, expects, names) - require.Equal(t, []int64{3, 18, 54, 0, 6, 0}, []int64{ + require.Equal(t, []int64{3, 16, 48, 0, 0, 0}, []int64{ tables[0].TableRows, tables[0].AvgRowLength, tables[0].DataLength, diff --git a/pkg/session/session.go b/pkg/session/session.go index 8fe917b358f43..c9350bd6f9be1 100644 --- a/pkg/session/session.go +++ b/pkg/session/session.go @@ -908,7 +908,7 @@ func (s *session) updateStatsDeltaToCollector() { if s.statsCollector != nil && mapper != nil { for _, item := range mapper { if item.TableID > 0 { - s.statsCollector.Update(item.TableID, item.Delta, item.Count, &item.ColSize) + s.statsCollector.Update(item.TableID, item.Delta, item.Count) } } } diff --git a/pkg/sessionctx/variable/session.go b/pkg/sessionctx/variable/session.go index 65ce1e9c5d969..89455c4acded0 100644 --- a/pkg/sessionctx/variable/session.go +++ b/pkg/sessionctx/variable/session.go @@ -341,38 +341,13 @@ func (tc *TransactionContext) CollectUnchangedKeysForLock(buf []kv.Key) []kv.Key return buf } -// ColSize is a data struct to store the delta information for a table. -type ColSize struct { - ColID int64 - Size int64 -} - -// DeltaCols is used to update the delta size for cols. -type DeltaCols interface { - // UpdateColSizeMap is used to update delta map for cols. - UpdateColSizeMap(m map[int64]int64) map[int64]int64 -} - -// DeltaColsMap implements DeltaCols -type DeltaColsMap map[int64]int64 - -// UpdateColSizeMap implements DeltaCols -func (cols DeltaColsMap) UpdateColSizeMap(m map[int64]int64) map[int64]int64 { - if m == nil && len(cols) > 0 { - m = make(map[int64]int64, len(cols)) - } - for colID, size := range cols { - m[colID] += size - } - return m -} - // UpdateDeltaForTable updates the delta info for some table. // The `cols` argument is used to update the delta size for cols. // If `cols` is nil, it means that the delta size for cols is not changed. func (tc *TransactionContext) UpdateDeltaForTable( - physicalTableID int64, delta int64, - count int64, cols DeltaCols, + physicalTableID int64, + delta int64, + count int64, ) { tc.tdmLock.Lock() defer tc.tdmLock.Unlock() @@ -383,9 +358,6 @@ func (tc *TransactionContext) UpdateDeltaForTable( item.Delta += delta item.Count += count item.TableID = physicalTableID - if cols != nil { - item.ColSize = cols.UpdateColSizeMap(item.ColSize) - } tc.TableDeltaMap[physicalTableID] = item } @@ -2916,7 +2888,6 @@ func (s *SessionVars) SetResourceGroupName(groupName string) { type TableDelta struct { Delta int64 Count int64 - ColSize map[int64]int64 InitTime time.Time // InitTime is the time that this delta is generated. TableID int64 } @@ -2926,7 +2897,6 @@ func (td TableDelta) Clone() TableDelta { return TableDelta{ Delta: td.Delta, Count: td.Count, - ColSize: maps.Clone(td.ColSize), InitTime: td.InitTime, TableID: td.TableID, } diff --git a/pkg/sessionctx/variable/session_test.go b/pkg/sessionctx/variable/session_test.go index 407b03eed582a..f4b6fac0144ed 100644 --- a/pkg/sessionctx/variable/session_test.go +++ b/pkg/sessionctx/variable/session_test.go @@ -334,14 +334,11 @@ func TestTableDeltaClone(t *testing.T) { td0 := variable.TableDelta{ Delta: 1, Count: 2, - ColSize: map[int64]int64{1: 1, 2: 2}, InitTime: time.Now(), TableID: 5, } td1 := td0.Clone() require.Equal(t, td0, td1) - td0.ColSize[3] = 3 - require.NotEqual(t, td0, td1) td2 := td0.Clone() require.Equal(t, td0, td2) @@ -356,7 +353,6 @@ func TestTransactionContextSavepoint(t *testing.T) { 1: { Delta: 1, Count: 2, - ColSize: map[int64]int64{1: 1}, InitTime: time.Now(), TableID: 5, }, @@ -375,11 +371,9 @@ func TestTransactionContextSavepoint(t *testing.T) { require.False(t, succ) require.Equal(t, 1, len(tc.Savepoints)) - tc.TableDeltaMap[1].ColSize[2] = 2 tc.TableDeltaMap[2] = variable.TableDelta{ Delta: 6, Count: 7, - ColSize: map[int64]int64{8: 8}, InitTime: time.Now(), TableID: 9, } @@ -389,7 +383,6 @@ func TestTransactionContextSavepoint(t *testing.T) { tc.AddSavepoint("S2", nil) require.Equal(t, 2, len(tc.Savepoints)) require.Equal(t, 1, len(tc.Savepoints[0].TxnCtxSavepoint.TableDeltaMap)) - require.Equal(t, 1, len(tc.Savepoints[0].TxnCtxSavepoint.TableDeltaMap[1].ColSize)) require.Equal(t, "s1", tc.Savepoints[0].Name) require.Equal(t, 2, len(tc.Savepoints[1].TxnCtxSavepoint.TableDeltaMap)) require.Equal(t, "s2", tc.Savepoints[1].Name) @@ -397,7 +390,6 @@ func TestTransactionContextSavepoint(t *testing.T) { tc.TableDeltaMap[3] = variable.TableDelta{ Delta: 10, Count: 11, - ColSize: map[int64]int64{12: 12}, InitTime: time.Now(), TableID: 13, } @@ -559,49 +551,6 @@ func TestSetStatus(t *testing.T) { require.Equal(t, mysql.ServerStatusAutocommit|mysql.ServerStatusCursorExists, sv.Status()) } -func TestMapDeltaCols(t *testing.T) { - for _, c := range []struct { - m map[int64]int64 - cols variable.DeltaColsMap - r map[int64]int64 - }{ - {}, - { - cols: map[int64]int64{1: 2}, - r: map[int64]int64{1: 2}, - }, - { - m: map[int64]int64{1: 2}, - r: map[int64]int64{1: 2}, - }, - { - m: map[int64]int64{1: 3, 3: 5, 5: 7}, - cols: map[int64]int64{1: 2, 3: -4, 6: 8}, - r: map[int64]int64{1: 5, 3: 1, 5: 7, 6: 8}, - }, - } { - originalCols := make(map[int64]int64) - for k, v := range c.cols { - originalCols[k] = v - } - - m2 := c.cols.UpdateColSizeMap(c.m) - require.Equal(t, c.r, m2) - if c.m == nil { - if len(c.cols) == 0 { - require.Nil(t, m2) - } - } else { - require.Equal(t, m2, c.m) - } - - if c.cols != nil { - // deltaCols not change - require.Equal(t, originalCols, map[int64]int64(c.cols)) - } - } -} - func TestRowIDShardGenerator(t *testing.T) { g := variable.NewRowIDShardGenerator(rand.New(rand.NewSource(12345)), 128) // #nosec G404) // default settings diff --git a/pkg/statistics/handle/autoanalyze/autoanalyze_test.go b/pkg/statistics/handle/autoanalyze/autoanalyze_test.go index fd18834fdfffb..c64483c7725c6 100644 --- a/pkg/statistics/handle/autoanalyze/autoanalyze_test.go +++ b/pkg/statistics/handle/autoanalyze/autoanalyze_test.go @@ -358,6 +358,7 @@ func TestAutoAnalyzeSkipColumnTypes(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t(a int, b int, c json, d text, e mediumtext, f blob, g mediumblob, index idx(d(10)))") + statstestutil.HandleNextDDLEventWithTxn(dom.StatsHandle()) tk.MustExec("insert into t values (1, 2, null, 'xxx', 'yyy', null, null)") tk.MustExec("select * from t where a = 1 and b = 1 and c = '1'") h := dom.StatsHandle() @@ -430,6 +431,7 @@ func TestAutoAnalyzeOutOfSpecifiedTime(t *testing.T) { tk.MustExec("use test") tk.MustExec("create table t (a int)") + statstestutil.HandleNextDDLEventWithTxn(dom.StatsHandle()) // to pass the stats.Pseudo check in autoAnalyzeTable tk.MustExec("analyze table t") // to pass the AutoAnalyzeMinCnt check in autoAnalyzeTable diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go index e5188078af7dd..1bfb65e398fd3 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_ddl_handler_test.go @@ -67,6 +67,7 @@ func TestHandleDDLEventsWithRunningJobs(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") statistics.AutoAnalyzeMinCnt = 0 defer func() { @@ -174,6 +175,7 @@ func TestTruncateTable(t *testing.T) { require.NoError(t, err) tableInfo := tbl.Meta() h := do.StatsHandle() + statstestutil.HandleNextDDLEventWithTxn(h) // Insert some data. testKit.MustExec("insert into t values (1,2),(2,2),(6,2),(11,2),(16,2)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -287,6 +289,7 @@ func TestDropTable(t *testing.T) { require.NoError(t, err) tableInfo := tbl.Meta() h := do.StatsHandle() + statstestutil.HandleNextDDLEventWithTxn(h) // Insert some data. testKit.MustExec("insert into t values (1,2),(2,2),(6,2),(11,2),(16,2)") require.NoError(t, h.DumpStatsDeltaToKV(true)) @@ -825,11 +828,12 @@ func TestDropSchemaEventWithDynamicPartition(t *testing.T) { func TestDropSchemaEventWithStaticPartition(t *testing.T) { store, do := testkit.CreateMockStoreAndDomain(t) + h := do.StatsHandle() testKit := testkit.NewTestKit(t, store) testKit.MustExec("use test") testKit.MustExec("create table t (c1 int, c2 int, index idx(c1, c2)) partition by range columns (c1) (partition p0 values less than (5), partition p1 values less than (10))") + statstestutil.HandleNextDDLEventWithTxn(h) testKit.MustExec("set global tidb_partition_prune_mode='static'") - h := do.StatsHandle() // Insert some data. testKit.MustExec("insert into t values (1,2),(6,6)") require.NoError(t, h.DumpStatsDeltaToKV(true)) diff --git a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go index f12fc7df42378..89b4fd27eec68 100644 --- a/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go +++ b/pkg/statistics/handle/autoanalyze/priorityqueue/queue_test.go @@ -68,8 +68,11 @@ func TestAnalysisPriorityQueue(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") + handle := dom.StatsHandle() tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") tk.MustExec("insert into t2 values (1)") statistics.AutoAnalyzeMinCnt = 0 @@ -78,7 +81,6 @@ func TestAnalysisPriorityQueue(t *testing.T) { }() ctx := context.Background() - handle := dom.StatsHandle() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(ctx, dom.InfoSchema())) @@ -123,7 +125,9 @@ func TestRefreshLastAnalysisDuration(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") tk.MustExec("insert into t2 values (1)") statistics.AutoAnalyzeMinCnt = 0 @@ -177,7 +181,9 @@ func testProcessDMLChanges(t *testing.T, partitioned bool) { if partitioned { tk.MustExec("use test") tk.MustExec("create table t1 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + statstestutil.HandleNextDDLEventWithTxn(handle) // Because we don't handle the DDL events in unit tests by default, // we need to use this way to make sure the stats record for the global table is created. // Insert some rows into the tables. @@ -191,7 +197,9 @@ func testProcessDMLChanges(t *testing.T, partitioned bool) { } else { tk.MustExec("use test") tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) } tk.MustExec("insert into t1 values (1)") tk.MustExec("insert into t2 values (1), (2)") @@ -278,7 +286,9 @@ func TestProcessDMLChangesWithRunningJobs(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") tk.MustExec("insert into t2 values (1)") statistics.AutoAnalyzeMinCnt = 0 @@ -363,6 +373,7 @@ func TestRequeueMustRetryJobs(t *testing.T) { tk.MustExec("create database example_schema") tk.MustExec("use example_schema") tk.MustExec("create table example_table (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) initJobs(tk) insertMultipleFinishedJobs(tk, "example_table", "") statistics.AutoAnalyzeMinCnt = 0 @@ -415,7 +426,9 @@ func TestProcessDMLChangesWithLockedTables(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") tk.MustExec("insert into t2 values (1)") statistics.AutoAnalyzeMinCnt = 0 @@ -474,6 +487,7 @@ func TestProcessDMLChangesWithLockedPartitionsAndDynamicPruneMode(t *testing.T) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(ctx, dom.InfoSchema())) @@ -534,6 +548,7 @@ func TestProcessDMLChangesWithLockedPartitionsAndStaticPruneMode(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int) partition by range (a) (partition p0 values less than (10), partition p1 values less than (20))") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") tk.MustExec("set global tidb_partition_prune_mode = 'static'") statistics.AutoAnalyzeMinCnt = 0 @@ -618,6 +633,7 @@ func TestPQHandlesTableDeletionGracefully(t *testing.T) { tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int)") + statstestutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("insert into t1 values (1)") statistics.AutoAnalyzeMinCnt = 0 defer func() { diff --git a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel index b8c337db3ddc4..c5d6ff6eb9af0 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel +++ b/pkg/statistics/handle/autoanalyze/refresher/BUILD.bazel @@ -41,6 +41,7 @@ go_test( "//pkg/sessionctx/sysproctrack", "//pkg/statistics", "//pkg/statistics/handle/autoanalyze/priorityqueue", + "//pkg/statistics/handle/ddl/testutil", "//pkg/statistics/handle/types", "//pkg/statistics/handle/util", "//pkg/testkit", diff --git a/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go b/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go index c3c4d4aa58cb5..dcbc201fd669e 100644 --- a/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go +++ b/pkg/statistics/handle/autoanalyze/refresher/refresher_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/statistics" "github.com/pingcap/tidb/pkg/statistics/handle/autoanalyze/refresher" + "github.com/pingcap/tidb/pkg/statistics/handle/ddl/testutil" "github.com/pingcap/tidb/pkg/statistics/handle/util" "github.com/pingcap/tidb/pkg/testkit" "github.com/stretchr/testify/require" @@ -384,12 +385,13 @@ func TestDoNotRetryTableNotExistJob(t *testing.T) { }() store, dom := testkit.CreateMockStoreAndDomain(t) + handle := dom.StatsHandle() tk := testkit.NewTestKit(t, store) tk.MustExec("use test") tk.MustExec("create table t1 (a int, b int, index idx(a))") + testutil.HandleNextDDLEventWithTxn(handle) // Insert some data. tk.MustExec("insert into t1 values (1, 1)") - handle := dom.StatsHandle() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) sysProcTracker := dom.SysProcTracker() @@ -431,13 +433,15 @@ func TestAnalyzeHighestPriorityTablesWithFailedAnalysis(t *testing.T) { }() store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) + handle := dom.StatsHandle() tk.MustExec("use test") tk.MustExec("set global tidb_enable_auto_analyze=true") tk.MustExec("set global tidb_auto_analyze_concurrency=2") tk.MustExec("create table t1 (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (4))") + testutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("create table t2 (a int, b int, index idx(a)) partition by range (a) (partition p0 values less than (2), partition p1 values less than (4))") + testutil.HandleNextDDLEventWithTxn(handle) tk.MustExec("analyze table t2") - handle := dom.StatsHandle() require.NoError(t, handle.DumpStatsDeltaToKV(true)) require.NoError(t, handle.Update(context.Background(), dom.InfoSchema())) tk.MustExec("insert into t1 values (1, 1), (2, 2), (3, 3)") diff --git a/pkg/statistics/handle/handletest/handle_test.go b/pkg/statistics/handle/handletest/handle_test.go index 84563dec1fc33..64c81379553f3 100644 --- a/pkg/statistics/handle/handletest/handle_test.go +++ b/pkg/statistics/handle/handletest/handle_test.go @@ -232,17 +232,9 @@ func TestLoadHist(t *testing.T) { newStatsTbl := h.GetTableStats(tableInfo) // The stats table is updated. require.False(t, oldStatsTbl == newStatsTbl) - // Only the TotColSize of histograms is updated. + // TotColSize of histograms is not changed. oldStatsTbl.ForEachColumnImmutable(func(id int64, hist *statistics.Column) bool { - require.Less(t, hist.TotColSize, newStatsTbl.GetCol(id).TotColSize) - - temp := hist.TotColSize - hist.TotColSize = newStatsTbl.GetCol(id).TotColSize - require.True(t, statistics.HistogramEqual(&hist.Histogram, &newStatsTbl.GetCol(id).Histogram, false)) - hist.TotColSize = temp - - require.True(t, hist.CMSketch.Equal(newStatsTbl.GetCol(id).CMSketch)) - require.Equal(t, newStatsTbl.GetCol(id).Info, hist.Info) + require.Equal(t, hist.TotColSize, newStatsTbl.GetCol(id).TotColSize) return false }) // Add column c3, we only update c3. @@ -1149,6 +1141,7 @@ func TestStatsCacheUpdateSkip(t *testing.T) { h := do.StatsHandle() testKit.MustExec("use test") testKit.MustExec("create table t (c1 int, c2 int)") + statstestutil.HandleNextDDLEventWithTxn(h) testKit.MustExec("insert into t values(1, 2)") require.NoError(t, h.DumpStatsDeltaToKV(true)) testKit.MustExec("analyze table t") diff --git a/pkg/statistics/handle/storage/update.go b/pkg/statistics/handle/storage/update.go index 008388ed76a67..fc312743f9c79 100644 --- a/pkg/statistics/handle/storage/update.go +++ b/pkg/statistics/handle/storage/update.go @@ -19,7 +19,6 @@ import ( "encoding/json" "fmt" "slices" - "strings" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/parser/ast" @@ -88,27 +87,6 @@ func UpdateStatsMeta( return err } -// DumpTableStatColSizeToKV dumps the column size stats to storage. -func DumpTableStatColSizeToKV(sctx sessionctx.Context, id int64, delta variable.TableDelta) error { - if len(delta.ColSize) == 0 { - return nil - } - values := make([]string, 0, len(delta.ColSize)) - for histID, deltaColSize := range delta.ColSize { - if deltaColSize == 0 { - continue - } - values = append(values, fmt.Sprintf("(%d, 0, %d, 0, %d)", id, histID, deltaColSize)) - } - if len(values) == 0 { - return nil - } - sql := fmt.Sprintf("insert into mysql.stats_histograms (table_id, is_index, hist_id, distinct_count, tot_col_size) "+ - "values %s on duplicate key update tot_col_size = GREATEST(0, tot_col_size + values(tot_col_size))", strings.Join(values, ",")) - _, _, err := statsutil.ExecRows(sctx, sql) - return errors.Trace(err) -} - // InsertExtendedStats inserts a record into mysql.stats_extended and update version in mysql.stats_meta. func InsertExtendedStats(sctx sessionctx.Context, statsCache types.StatsCache, diff --git a/pkg/statistics/handle/updatetest/update_test.go b/pkg/statistics/handle/updatetest/update_test.go index 76f20d0e83f85..ef2ecddae9548 100644 --- a/pkg/statistics/handle/updatetest/update_test.go +++ b/pkg/statistics/handle/updatetest/update_test.go @@ -135,7 +135,7 @@ func TestSingleSessionInsert(t *testing.T) { rs.Check(testkit.Rows("40", "70")) rs = testKit.MustQuery("select tot_col_size from mysql.stats_histograms").Sort() - rs.Check(testkit.Rows("0", "0", "20", "20")) + rs.Check(testkit.Rows("0", "0", "10", "10")) // test dump delta only when `modify count / count` is greater than the ratio. originValue := usage.DumpStatsDeltaRatio @@ -324,7 +324,7 @@ func TestUpdatePartition(t *testing.T) { statsTbl := h.GetPartitionStats(tableInfo, def.ID) require.Equal(t, int64(1), statsTbl.ModifyCount) require.Equal(t, int64(1), statsTbl.RealtimeCount) - require.Equal(t, int64(2), statsTbl.GetCol(bColID).TotColSize) + require.Equal(t, int64(0), statsTbl.GetCol(bColID).TotColSize) } testKit.MustExec(`update t set a = a + 1, b = "aa"`) @@ -334,7 +334,7 @@ func TestUpdatePartition(t *testing.T) { statsTbl := h.GetPartitionStats(tableInfo, def.ID) require.Equal(t, int64(2), statsTbl.ModifyCount) require.Equal(t, int64(1), statsTbl.RealtimeCount) - require.Equal(t, int64(3), statsTbl.GetCol(bColID).TotColSize) + require.Equal(t, int64(0), statsTbl.GetCol(bColID).TotColSize) } testKit.MustExec("delete from t") @@ -432,8 +432,7 @@ func TestAutoUpdate(t *testing.T) { // Modify count is non-zero means that we do not analyze the table. require.Equal(t, int64(1), stats.ModifyCount) stats.ForEachColumnImmutable(func(_ int64, item *statistics.Column) bool { - // TotColSize = 27, because the table has not been analyzed, and insert statement will add 3(length of 'eee') to TotColSize. - require.Equal(t, int64(27), item.TotColSize) + require.Equal(t, int64(23), item.TotColSize) return true }) @@ -514,6 +513,7 @@ func TestIssue25700(t *testing.T) { tk.MustExec("use test") tk.MustExec("drop table if exists t") tk.MustExec("CREATE TABLE `t` ( `ldecimal` decimal(32,4) DEFAULT NULL, `rdecimal` decimal(32,4) DEFAULT NULL, `gen_col` decimal(36,4) GENERATED ALWAYS AS (`ldecimal` + `rdecimal`) VIRTUAL, `col_timestamp` timestamp(3) NULL DEFAULT NULL ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;") + statstestutil.HandleNextDDLEventWithTxn(dom.StatsHandle()) tk.MustExec("analyze table t") tk.MustExec("INSERT INTO `t` (`ldecimal`, `rdecimal`, `col_timestamp`) VALUES (2265.2200, 9843.4100, '1999-12-31 16:00:00')" + strings.Repeat(", (2265.2200, 9843.4100, '1999-12-31 16:00:00')", int(statistics.AutoAnalyzeMinCnt))) require.NoError(t, dom.StatsHandle().DumpStatsDeltaToKV(true)) diff --git a/pkg/statistics/handle/usage/session_stats_collect.go b/pkg/statistics/handle/usage/session_stats_collect.go index 352a26dc1b9bc..805a85a02f4cf 100644 --- a/pkg/statistics/handle/usage/session_stats_collect.go +++ b/pkg/statistics/handle/usage/session_stats_collect.go @@ -108,17 +108,10 @@ func (s *statsUsageImpl) DumpStatsDeltaToKV(dumpAll bool) error { return errors.Trace(err) } if updated { - UpdateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count, nil) - } - if err = storage.DumpTableStatColSizeToKV(sctx, id, item); err != nil { - delete(deltaMap, id) - return errors.Trace(err) - } - if updated { + UpdateTableDeltaMap(deltaMap, id, -item.Delta, -item.Count) delete(deltaMap, id) } else { m := deltaMap[id] - m.ColSize = nil deltaMap[id] = m } } @@ -303,10 +296,10 @@ func (s *SessionStatsItem) Delete() { } // Update will updates the delta and count for one table id. -func (s *SessionStatsItem) Update(id int64, delta int64, count int64, colSize *map[int64]int64) { +func (s *SessionStatsItem) Update(id int64, delta int64, count int64) { s.Lock() defer s.Unlock() - s.mapper.Update(id, delta, count, colSize) + s.mapper.Update(id, delta, count) } // ClearForTest clears the mapper for test. @@ -453,10 +446,10 @@ func (m *TableDelta) GetDeltaAndReset() map[int64]variable.TableDelta { } // Update updates the delta of the table. -func (m *TableDelta) Update(id int64, delta int64, count int64, colSize *map[int64]int64) { +func (m *TableDelta) Update(id int64, delta int64, count int64) { m.lock.Lock() defer m.lock.Unlock() - UpdateTableDeltaMap(m.delta, id, delta, count, colSize) + UpdateTableDeltaMap(m.delta, id, delta, count) } // Merge merges the deltaMap into the TableDelta. @@ -467,23 +460,15 @@ func (m *TableDelta) Merge(deltaMap map[int64]variable.TableDelta) { m.lock.Lock() defer m.lock.Unlock() for id, item := range deltaMap { - UpdateTableDeltaMap(m.delta, id, item.Delta, item.Count, &item.ColSize) + UpdateTableDeltaMap(m.delta, id, item.Delta, item.Count) } } // UpdateTableDeltaMap updates the delta of the table. -func UpdateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64, colSize *map[int64]int64) { +func UpdateTableDeltaMap(m map[int64]variable.TableDelta, id int64, delta int64, count int64) { item := m[id] item.Delta += delta item.Count += count - if item.ColSize == nil { - item.ColSize = make(map[int64]int64) - } - if colSize != nil { - for key, val := range *colSize { - item.ColSize[key] += val - } - } m[id] = item } diff --git a/pkg/table/BUILD.bazel b/pkg/table/BUILD.bazel index 85065af129027..12383de4ef8c6 100644 --- a/pkg/table/BUILD.bazel +++ b/pkg/table/BUILD.bazel @@ -38,7 +38,6 @@ go_library( "//pkg/util/sqlexec", "//pkg/util/timeutil", "//pkg/util/tracing", - "@com_github_bits_and_blooms_bitset//:bitset", "@com_github_pingcap_errors//:errors", "@org_uber_go_zap//:zap", ], diff --git a/pkg/table/table.go b/pkg/table/table.go index 7ffdd011c716d..bffc4d898d8f6 100644 --- a/pkg/table/table.go +++ b/pkg/table/table.go @@ -22,7 +22,6 @@ import ( "context" "time" - "github.com/bits-and-blooms/bitset" mysql "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/expression/exprctx" @@ -218,7 +217,6 @@ type UpdateRecordOption interface { // RemoveRecordOpt contains the options will be used when removing a record. type RemoveRecordOpt struct { indexesLayoutOffset IndexesLayout - columnSize *ColumnsSizeHelper } // HasIndexesLayout returns whether the RemoveRecordOpt has indexes layout. @@ -236,11 +234,6 @@ func (opt *RemoveRecordOpt) GetIndexLayout(indexID int64) IndexRowLayoutOption { return opt.indexesLayoutOffset[indexID] } -// GetColumnSizeOpt returns the ColumnSizeOption of the RemoveRecordOpt. -func (opt *RemoveRecordOpt) GetColumnSizeOpt() *ColumnsSizeHelper { - return opt.columnSize -} - // NewRemoveRecordOpt creates a new RemoveRecordOpt with options. func NewRemoveRecordOpt(opts ...RemoveRecordOption) *RemoveRecordOpt { opt := &RemoveRecordOpt{} @@ -255,17 +248,6 @@ type RemoveRecordOption interface { applyRemoveRecordOpt(*RemoveRecordOpt) } -// ExtraPartialRowOption is the combined one of IndexesLayout and ColumnSizeOption. -type ExtraPartialRowOption struct { - IndexesRowLayout IndexesLayout - ColumnsSizeHelper *ColumnsSizeHelper -} - -func (e *ExtraPartialRowOption) applyRemoveRecordOpt(opt *RemoveRecordOpt) { - opt.indexesLayoutOffset = e.IndexesRowLayout - opt.columnSize = e.ColumnsSizeHelper -} - // IndexRowLayoutOption is the option for index row layout. // It is used to specify the order of the index columns in the row. type IndexRowLayoutOption []int @@ -282,21 +264,8 @@ func (idx IndexesLayout) GetIndexLayout(idxID int64) IndexRowLayoutOption { return idx[idxID] } -// ColumnsSizeHelper records the column size information. -// We're updating the total column size and total row size used in table statistics when doing DML. -// If the column is pruned when doing DML, we can't get the accurate size of the column. So we need the estimated avg size. -// - If the column is not pruned, we can calculate its acurate size by the real data. -// - Otherwise, we use the estimated avg size given by table statistics and field type information. -type ColumnsSizeHelper struct { - // NotPruned is a bitset to record the columns that are not pruned. - // The ith bit is 1 means the ith public column is not pruned. - NotPruned *bitset.BitSet - // If the column is pruned, we use the estimated avg size. They are stored by their ordinal in the table. - // The ith element is the estimated size of the ith pruned public column. - AvgSizes []float64 - // If the column is not pruned, we use the accurate size. They are stored by their ordinal in the pruned row. - // The ith element is the position of the ith public column in the pruned row. - PublicColsLayout []int +func (idx IndexesLayout) applyRemoveRecordOpt(opt *RemoveRecordOpt) { + opt.indexesLayoutOffset = idx } // CommonMutateOptFunc is a function to provide common options for mutating a table. diff --git a/pkg/table/tables/tables.go b/pkg/table/tables/tables.go index 79fedefe90b3c..9e78c0b76e722 100644 --- a/pkg/table/tables/tables.go +++ b/pkg/table/tables/tables.go @@ -533,21 +533,7 @@ func (t *TableCommon) updateRecord(sctx table.MutateContext, txn kv.Transaction, memBuffer.Release(sh) if s, ok := sctx.GetStatisticsSupport(); ok { - colSizeBuffer := mutateBuffers.GetColSizeDeltaBufferWithCap(len(t.Cols())) - for id, col := range t.Cols() { - size, err := codec.EstimateValueSize(tc, newData[id]) - if err != nil { - continue - } - newLen := size - 1 - size, err = codec.EstimateValueSize(tc, oldData[id]) - if err != nil { - continue - } - oldLen := size - 1 - colSizeBuffer.AddColSizeDelta(col.ID, int64(newLen-oldLen)) - } - s.UpdatePhysicalTableDelta(t.physicalTableID, 0, 1, colSizeBuffer) + s.UpdatePhysicalTableDelta(t.physicalTableID, 0, 1) } return nil } @@ -908,15 +894,7 @@ func (t *TableCommon) addRecord(sctx table.MutateContext, txn kv.Transaction, r memBuffer.Release(sh) if s, ok := sctx.GetStatisticsSupport(); ok { - colSizeBuffer := sctx.GetMutateBuffers().GetColSizeDeltaBufferWithCap(len(t.Cols())) - for id, col := range t.Cols() { - size, err := codec.EstimateValueSize(tc, r[id]) - if err != nil { - continue - } - colSizeBuffer.AddColSizeDelta(col.ID, int64(size-1)) - } - s.UpdatePhysicalTableDelta(t.physicalTableID, 1, 1, colSizeBuffer) + s.UpdatePhysicalTableDelta(t.physicalTableID, 1, 1) } return recordID, nil } @@ -1169,33 +1147,8 @@ func (t *TableCommon) removeRecord(ctx table.MutateContext, txn kv.Transaction, memBuffer.Release(sh) if s, ok := ctx.GetStatisticsSupport(); ok { - // a reusable buffer to save malloc - // Note: The buffer should not be referenced or modified outside this function. - // It can only act as a temporary buffer for the current function call. - colSizeBuffer := ctx.GetMutateBuffers().GetColSizeDeltaBufferWithCap(len(t.Cols())) - pruned, notPruned := 0, 0 - columnSizeOpt := opt.GetColumnSizeOpt() - var size int - for id, col := range t.Cols() { - columnOffset := id - if columnSizeOpt != nil { - if !columnSizeOpt.NotPruned.Test(uint(id)) { - size = int(columnSizeOpt.AvgSizes[pruned]) - pruned++ - colSizeBuffer.AddColSizeDelta(col.ID, -int64(size-1)) - continue - } - columnOffset = columnSizeOpt.PublicColsLayout[notPruned] - notPruned++ - } - size, err = codec.EstimateValueSize(tc, r[columnOffset]) - if err != nil { - continue - } - colSizeBuffer.AddColSizeDelta(col.ID, -int64(size-1)) - } s.UpdatePhysicalTableDelta( - t.physicalTableID, -1, 1, colSizeBuffer, + t.physicalTableID, -1, 1, ) } return err diff --git a/pkg/table/tblctx/BUILD.bazel b/pkg/table/tblctx/BUILD.bazel index 72906368a6f18..f7ea643d76115 100644 --- a/pkg/table/tblctx/BUILD.bazel +++ b/pkg/table/tblctx/BUILD.bazel @@ -32,7 +32,7 @@ go_test( srcs = ["buffers_test.go"], embed = [":tblctx"], flaky = True, - shard_count = 6, + shard_count = 5, deps = [ "//pkg/errctx", "//pkg/kv", diff --git a/pkg/table/tblctx/buffers.go b/pkg/table/tblctx/buffers.go index 53a33fdaa648b..5e9e8d93d0bb1 100644 --- a/pkg/table/tblctx/buffers.go +++ b/pkg/table/tblctx/buffers.go @@ -113,35 +113,6 @@ func (b *CheckRowBuffer) Reset(capacity int) { b.rowToCheck = ensureCapacityAndReset(b.rowToCheck, 0, capacity) } -// ColSizeDeltaBuffer implements variable.DeltaCols -var _ variable.DeltaCols = &ColSizeDeltaBuffer{} - -// ColSizeDeltaBuffer is a buffer to store the change of column size. -type ColSizeDeltaBuffer struct { - delta []variable.ColSize -} - -// Reset resets the inner buffers to a capacity. -func (b *ColSizeDeltaBuffer) Reset(capacity int) { - b.delta = ensureCapacityAndReset(b.delta, 0, capacity) -} - -// AddColSizeDelta adds the column size delta to the buffer. -func (b *ColSizeDeltaBuffer) AddColSizeDelta(colID int64, size int64) { - b.delta = append(b.delta, variable.ColSize{ColID: colID, Size: size}) -} - -// UpdateColSizeMap updates the column size map which uses columID as the map key and column size as the value. -func (b *ColSizeDeltaBuffer) UpdateColSizeMap(m map[int64]int64) map[int64]int64 { - if m == nil && len(b.delta) > 0 { - m = make(map[int64]int64, len(b.delta)) - } - for _, delta := range b.delta { - m[delta.ColID] += delta.Size - } - return m -} - // MutateBuffers is a memory pool for table related memory allocation that aims to reuse memory // and saves allocation. // It is used in table operations like AddRecord/UpdateRecord/DeleteRecord. @@ -149,10 +120,9 @@ func (b *ColSizeDeltaBuffer) UpdateColSizeMap(m map[int64]int64) map[int64]int64 // Because inner slices are reused, you should not call the get methods again before finishing the previous usage. // Otherwise, the previous data will be overwritten. type MutateBuffers struct { - stmtBufs *variable.WriteStmtBufs - encodeRow *EncodeRowBuffer - checkRow *CheckRowBuffer - colSizeDelta *ColSizeDeltaBuffer + stmtBufs *variable.WriteStmtBufs + encodeRow *EncodeRowBuffer + checkRow *CheckRowBuffer } // NewMutateBuffers creates a new `MutateBuffers`. @@ -163,8 +133,7 @@ func NewMutateBuffers(stmtBufs *variable.WriteStmtBufs) *MutateBuffers { encodeRow: &EncodeRowBuffer{ writeStmtBufs: stmtBufs, }, - checkRow: &CheckRowBuffer{}, - colSizeDelta: &ColSizeDeltaBuffer{}, + checkRow: &CheckRowBuffer{}, } } @@ -194,20 +163,6 @@ func (b *MutateBuffers) GetCheckRowBufferWithCap(capacity int) *CheckRowBuffer { return buffer } -// GetColSizeDeltaBufferWithCap gets the buffer for column size delta collection -// and resets the capacity of its inner slice. -// Usage: -// 1. Call `GetColSizeDeltaBufferWithCap` to get the buffer. -// 2. Call `ColSizeDeltaBuffer.AddColSizeDelta` for every column to add column size delta. -// 3. Call `ColSizeDeltaBuffer.UpdateColSizeMap` to update a column size map. -// Because the inner slices are reused, you should not call this method again before finishing the previous usage. -// Otherwise, the previous data will be overwritten. -func (b *MutateBuffers) GetColSizeDeltaBufferWithCap(capacity int) *ColSizeDeltaBuffer { - buffer := b.colSizeDelta - buffer.Reset(capacity) - return buffer -} - // GetWriteStmtBufs returns the `*variable.WriteStmtBufs` func (b *MutateBuffers) GetWriteStmtBufs() *variable.WriteStmtBufs { return b.stmtBufs diff --git a/pkg/table/tblctx/buffers_test.go b/pkg/table/tblctx/buffers_test.go index 266c2bb8108e1..264efe7619cec 100644 --- a/pkg/table/tblctx/buffers_test.go +++ b/pkg/table/tblctx/buffers_test.go @@ -201,35 +201,6 @@ func TestCheckRowBuffer(t *testing.T) { require.Equal(t, 6, cap(buffer.rowToCheck)) } -func TestColSizeDeltaBuffer(t *testing.T) { - buffer := &ColSizeDeltaBuffer{} - buffer.Reset(6) - require.Equal(t, 0, len(buffer.delta)) - require.Equal(t, 6, cap(buffer.delta)) - require.Nil(t, buffer.UpdateColSizeMap(nil)) - - buffer.AddColSizeDelta(1, 2) - buffer.AddColSizeDelta(3, -4) - buffer.AddColSizeDelta(10, 11) - require.Equal(t, []variable.ColSize{{ColID: 1, Size: 2}, {ColID: 3, Size: -4}, {ColID: 10, Size: 11}}, buffer.delta) - - require.Equal(t, map[int64]int64{1: 2, 3: -4, 10: 11}, buffer.UpdateColSizeMap(nil)) - m := make(map[int64]int64) - m2 := buffer.UpdateColSizeMap(m) - require.Equal(t, map[int64]int64{1: 2, 3: -4, 10: 11}, m2) - require.Equal(t, m2, m) - - m = map[int64]int64{1: 3, 3: 5, 5: 7} - m2 = buffer.UpdateColSizeMap(m) - require.Equal(t, map[int64]int64{1: 5, 3: 1, 5: 7, 10: 11}, m2) - require.Equal(t, m2, m) - - // reset should not shrink the capacity - buffer.Reset(2) - require.Equal(t, 0, len(buffer.delta)) - require.Equal(t, 6, cap(buffer.delta)) -} - func TestMutateBuffersGetter(t *testing.T) { stmtBufs := &variable.WriteStmtBufs{} buffers := NewMutateBuffers(stmtBufs) @@ -241,9 +212,6 @@ func TestMutateBuffersGetter(t *testing.T) { require.Equal(t, 6, cap(update.rowToCheck)) require.Equal(t, 6, cap(update.rowToCheck)) - colSize := buffers.GetColSizeDeltaBufferWithCap(6) - require.Equal(t, 6, cap(colSize.delta)) - require.Same(t, stmtBufs, buffers.GetWriteStmtBufs()) } diff --git a/pkg/table/tblctx/table.go b/pkg/table/tblctx/table.go index b9e0ecad3ac68..a5b30edc7fb4c 100644 --- a/pkg/table/tblctx/table.go +++ b/pkg/table/tblctx/table.go @@ -38,7 +38,7 @@ type RowEncodingConfig struct { // StatisticsSupport is used for statistics update operations. type StatisticsSupport interface { // UpdatePhysicalTableDelta updates the physical table delta. - UpdatePhysicalTableDelta(physicalTableID int64, delta int64, count int64, cols variable.DeltaCols) + UpdatePhysicalTableDelta(physicalTableID int64, delta int64, count int64) } // CachedTableSupport is used for cached table operations diff --git a/pkg/table/tblsession/table.go b/pkg/table/tblsession/table.go index 2ede0148886a5..e28aaaee60955 100644 --- a/pkg/table/tblsession/table.go +++ b/pkg/table/tblsession/table.go @@ -127,10 +127,10 @@ func (ctx *MutateContext) GetStatisticsSupport() (tblctx.StatisticsSupport, bool // UpdatePhysicalTableDelta implements the StatisticsSupport interface. func (ctx *MutateContext) UpdatePhysicalTableDelta( - physicalTableID int64, delta int64, count int64, cols variable.DeltaCols, + physicalTableID int64, delta int64, count int64, ) { if txnCtx := ctx.vars().TxnCtx; txnCtx != nil { - txnCtx.UpdateDeltaForTable(physicalTableID, delta, count, cols) + txnCtx.UpdateDeltaForTable(physicalTableID, delta, count) } } diff --git a/pkg/table/tblsession/table_test.go b/pkg/table/tblsession/table_test.go index d48d524752dd8..f6487631fbfc1 100644 --- a/pkg/table/tblsession/table_test.go +++ b/pkg/table/tblsession/table_test.go @@ -103,14 +103,13 @@ func TestSessionMutateContextFields(t *testing.T) { require.NotNil(t, statisticsSupport) require.Equal(t, 0, len(txnCtx.TableDeltaMap)) statisticsSupport.UpdatePhysicalTableDelta( - 12, 1, 2, variable.DeltaColsMap(map[int64]int64{3: 4, 5: 6}), + 12, 1, 2, ) require.Equal(t, 1, len(txnCtx.TableDeltaMap)) deltaMap := txnCtx.TableDeltaMap[12] require.Equal(t, int64(12), deltaMap.TableID) require.Equal(t, int64(1), deltaMap.Delta) require.Equal(t, int64(2), deltaMap.Count) - require.Equal(t, map[int64]int64{3: 4, 5: 6}, deltaMap.ColSize) // cached table support sctx.GetSessionVars().TxnCtx = nil cachedTableSupport, ok := ctx.GetCachedTableSupport()