Skip to content

Commit

Permalink
[improvement](statistics)Async drop table stats while doing truncate …
Browse files Browse the repository at this point in the history
…and schema change. (apache#45923)

### What problem does this PR solve?

Async drop table stats while doing truncate and schema change. Truncate
can schema change operation may hold table's write lock. And these two
operations will trigger drop old stats info. Drop stats with write lock
holding may bring performance issue.

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:

### Release note

None
  • Loading branch information
Jibing-Li authored Dec 26, 2024
1 parent 4036b67 commit cd9e5bb
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@
import org.apache.doris.statistics.TableStatsMeta;
import org.apache.doris.statistics.UpdatePartitionStatsTarget;
import org.apache.doris.statistics.query.QueryStats;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.system.Backend;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService;
Expand Down Expand Up @@ -3399,8 +3400,9 @@ public TStatus invalidateStatsCache(TInvalidateFollowerStatsCacheRequest request
partitionNames = new PartitionNames(false, new ArrayList<>(target.partitions));
}
if (target.isTruncate) {
analysisManager.submitAsyncDropStatsTask(target.catalogId, target.dbId,
target.tableId, tableStats, partitionNames);
TableIf table = StatisticsUtil.findTable(target.catalogId, target.dbId, target.tableId);
analysisManager.submitAsyncDropStatsTask(table, target.catalogId, target.dbId,
target.tableId, tableStats, partitionNames, false);
} else {
analysisManager.invalidateLocalStats(target.catalogId, target.dbId, target.tableId,
target.columns, tableStats, partitionNames);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,12 +159,12 @@ public class AnalysisManager implements Writable {
public AnalysisManager() {
if (!Env.isCheckpointThread()) {
this.taskExecutor = new AnalysisTaskExecutor(Config.statistics_simultaneously_running_task_num,
Integer.MAX_VALUE);
Integer.MAX_VALUE, "Manual Analysis Job Executor");
this.statisticsCache = new StatisticsCache();
this.dropStatsExecutors = ThreadPoolManager.newDaemonThreadPool(
1, 1, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(10),
new ThreadPoolExecutor.AbortPolicy(),
1, 3, 10,
TimeUnit.DAYS, new LinkedBlockingQueue<>(20),
new ThreadPoolExecutor.DiscardPolicy(),
"Drop stats executor", true);
}
}
Expand Down Expand Up @@ -696,20 +696,7 @@ public void dropStats(TableIf table, PartitionNames partitionNames) {
long catalogId = table.getDatabase().getCatalog().getId();
long dbId = table.getDatabase().getId();
long tableId = table.getId();
if (!table.isPartitionedTable() || partitionNames == null
|| partitionNames.isStar() || partitionNames.getPartitionNames() == null) {
removeTableStats(tableId);
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId));
}
submitAsyncDropStatsTask(catalogId, dbId, tableId, tableStats, partitionNames);
// Drop stats ddl is master only operation.
Set<String> partitions = null;
if (partitionNames != null && !partitionNames.isStar() && partitionNames.getPartitionNames() != null) {
partitions = new HashSet<>(partitionNames.getPartitionNames());
}
// Drop stats ddl is master only operation.
invalidateRemoteStats(catalogId, dbId, tableId, null, partitions, true);
StatisticsRepository.dropStatistics(catalogId, dbId, table.getId(), null, partitions);
submitAsyncDropStatsTask(table, catalogId, dbId, tableId, tableStats, partitionNames, true);
} catch (Throwable e) {
LOG.warn("Failed to drop stats for table {}", table.getName(), e);
}
Expand All @@ -722,30 +709,55 @@ class DropStatsTask implements Runnable {
private final Set<String> columns;
private final TableStatsMeta tableStats;
private final PartitionNames partitionNames;
private final TableIf table;
private final boolean isMaster;

public DropStatsTask(long catalogId, long dbId, long tableId, Set<String> columns,
TableStatsMeta tableStats, PartitionNames partitionNames) {
public DropStatsTask(TableIf table, long catalogId, long dbId, long tableId, Set<String> columns,
TableStatsMeta tableStats, PartitionNames partitionNames, boolean isMaster) {
this.catalogId = catalogId;
this.dbId = dbId;
this.tableId = tableId;
this.columns = columns;
this.tableStats = tableStats;
this.partitionNames = partitionNames;
this.table = table;
this.isMaster = isMaster;
}

@Override
public void run() {
invalidateLocalStats(catalogId, dbId, tableId, columns, tableStats, partitionNames);
try {
if (isMaster) {
if (!table.isPartitionedTable() || partitionNames == null
|| partitionNames.isStar() || partitionNames.getPartitionNames() == null) {
removeTableStats(tableId);
Env.getCurrentEnv().getEditLog().logDeleteTableStats(new TableStatsDeletionLog(tableId));
}
// Drop stats ddl is master only operation.
Set<String> partitions = null;
if (partitionNames != null && !partitionNames.isStar()
&& partitionNames.getPartitionNames() != null) {
partitions = new HashSet<>(partitionNames.getPartitionNames());
}
// Drop stats ddl is master only operation.
StatisticsRepository.dropStatistics(catalogId, dbId, tableId, null, partitions);
invalidateRemoteStats(catalogId, dbId, tableId, null, partitions, true);
}
invalidateLocalStats(catalogId, dbId, tableId, columns, tableStats, partitionNames);
} catch (Throwable t) {
LOG.info("Failed to async drop stats for table {}.{}.{}, reason: {}",
catalogId, dbId, tableId, t.getMessage());
}
}
}

public void submitAsyncDropStatsTask(long catalogId, long dbId, long tableId,
TableStatsMeta tableStats, PartitionNames partitionNames) {
public void submitAsyncDropStatsTask(TableIf table, long catalogId, long dbId, long tableId,
TableStatsMeta tableStats, PartitionNames partitionNames, boolean isMaster) {
try {
dropStatsExecutors.submit(new DropStatsTask(catalogId, dbId, tableId, null, tableStats, partitionNames));
dropStatsExecutors.submit(new DropStatsTask(table, catalogId, dbId, tableId, null,
tableStats, partitionNames, isMaster));
} catch (Throwable t) {
LOG.info("Failed to drop stats for truncate table {}.{}.{}. Reason:{}",
catalogId, dbId, tableId, t.getMessage());
LOG.info("Failed to submit async drop stats job. reason: {}", t.getMessage());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,17 @@ public class AnalysisTaskExecutor {
Comparator.comparingLong(AnalysisTaskWrapper::getStartTime));

public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum) {
this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE);
this(simultaneouslyRunningTaskNum, Integer.MAX_VALUE, "Analysis Job Executor");
}

public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize) {
public AnalysisTaskExecutor(int simultaneouslyRunningTaskNum, int taskQueueSize, String poolName) {
if (!Env.isCheckpointThread()) {
executors = ThreadPoolManager.newDaemonThreadPool(
simultaneouslyRunningTaskNum,
simultaneouslyRunningTaskNum, 0,
TimeUnit.DAYS, new LinkedBlockingQueue<>(taskQueueSize),
new BlockedPolicy("Analysis Job Executor Block Policy", Integer.MAX_VALUE),
"Analysis Job Executor", true);
poolName, true);
cancelExpiredTask();
} else {
executors = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public class StatisticsAutoCollector extends MasterDaemon {
public StatisticsAutoCollector() {
super("Automatic Analyzer", TimeUnit.MINUTES.toMillis(Config.auto_check_statistics_in_minutes));
this.analysisTaskExecutor = new AnalysisTaskExecutor(Config.auto_analyze_simultaneously_running_task_num,
StatisticConstants.TASK_QUEUE_CAP);
StatisticConstants.TASK_QUEUE_CAP, "Auto Analysis Job Executor");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -659,11 +659,11 @@ public void invalidateLocalStats(long catalogId, long dbId, long tableId, Set<St
AnalysisManager analysisManager = new AnalysisManager();
for (int i = 0; i < 20; i++) {
System.out.println("Submit " + i);
analysisManager.submitAsyncDropStatsTask(0, 0, 0, null, null);
analysisManager.submitAsyncDropStatsTask(null, 0, 0, 0, null, null, false);
}
Thread.sleep(25000);
Thread.sleep(10000);
System.out.println(count.get());
Assertions.assertTrue(count.get() > 10);
Assertions.assertTrue(count.get() < 20);
Assertions.assertTrue(count.get() > 0);
Assertions.assertTrue(count.get() <= 20);
}
}
20 changes: 20 additions & 0 deletions regression-test/suites/statistics/analyze_stats.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,22 @@ import java.util.stream.Collectors

suite("test_analyze") {

def stats_dropped = { table ->
def result1 = sql """show column cached stats $table"""
def result2 = sql """show column stats $table"""
boolean dropped = false
for (int i = 0; i < 120; i++) {
if (0 == result1.size() && 0 == result2.size()) {
dropped = true;
break;
}
Thread.sleep(1000)
result1 = sql """show column cached stats $table"""
result2 = sql """show column stats $table"""
}
assertTrue(dropped)
}

String db = "test_analyze"

String tbl = "analyzetestlimited_duplicate_all"
Expand Down Expand Up @@ -1152,6 +1168,8 @@ PARTITION `p599` VALUES IN (599)
ALTER TABLE analyze_test_with_schema_update ADD COLUMN tbl_name VARCHAR(256) DEFAULT NULL;
"""

stats_dropped("analyze_test_with_schema_update")

sql """
ANALYZE TABLE analyze_test_with_schema_update WITH SYNC
"""
Expand Down Expand Up @@ -1349,6 +1367,7 @@ PARTITION `p599` VALUES IN (599)
def result_before_truncate = sql """show column stats ${tbl}"""
assertEquals(14, result_before_truncate.size())
sql """TRUNCATE TABLE ${tbl}"""
stats_dropped(tbl)
def result_after_truncate = sql """show column stats ${tbl}"""
assertEquals(0, result_after_truncate.size())
result_after_truncate = sql """show column cached stats ${tbl}"""
Expand All @@ -1375,6 +1394,7 @@ PARTITION `p599` VALUES IN (599)
assert "1111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111111" == truncate_test_result[0][8].substring(1, 1025)

sql """TRUNCATE TABLE ${tbl}"""
stats_dropped(tbl)
result_after_truncate = sql """show column stats ${tbl}"""
assertEquals(0, result_after_truncate.size())
sql """ANALYZE TABLE ${tbl} WITH SYNC"""
Expand Down
17 changes: 17 additions & 0 deletions regression-test/suites/statistics/test_analyze_mv.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,22 @@ suite("test_analyze_mv") {
assertTrue(found)
}

def stats_dropped = { table ->
def result1 = sql """show column cached stats $table"""
def result2 = sql """show column stats $table"""
boolean dropped = false
for (int i = 0; i < 120; i++) {
if (0 == result1.size() && 0 == result2.size()) {
dropped = true;
break;
}
Thread.sleep(1000)
result1 = sql """show column cached stats $table"""
result2 = sql """show column stats $table"""
}
assertTrue(dropped)
}

sql """drop database if exists test_analyze_mv"""
sql """create database test_analyze_mv"""
sql """use test_analyze_mv"""
Expand Down Expand Up @@ -674,6 +690,7 @@ suite("test_analyze_mv") {
// * Test row count report and report for nereids
sql """truncate table mvTestDup"""
result_row = sql """show index stats mvTestDup mv3"""
stats_dropped("mvTestDup")
assertEquals(1, result_row.size())
assertEquals("mvTestDup", result_row[0][0])
assertEquals("mv3", result_row[0][1])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,22 @@

suite("test_drop_stats_and_truncate") {

def stats_dropped = { table ->
def result1 = sql """show column cached stats $table"""
def result2 = sql """show column stats $table"""
boolean dropped = false
for (int i = 0; i < 120; i++) {
if (0 == result1.size() && 0 == result2.size()) {
dropped = true;
break;
}
Thread.sleep(1000)
result1 = sql """show column cached stats $table"""
result2 = sql """show column stats $table"""
}
assertTrue(dropped)
}

sql """drop database if exists test_drop_stats_and_truncate"""
sql """create database test_drop_stats_and_truncate"""
sql """use test_drop_stats_and_truncate"""
Expand Down Expand Up @@ -101,6 +117,7 @@ suite("test_drop_stats_and_truncate") {
assertEquals(3, columns.size())

sql """truncate table non_part"""
stats_dropped("non_part")
result = sql """show column stats non_part"""
assertEquals(0, result.size())
result = sql """show table stats non_part"""
Expand Down Expand Up @@ -148,6 +165,7 @@ suite("test_drop_stats_and_truncate") {
assertEquals(9, columns.size())

sql """truncate table part"""
stats_dropped("part")
result = sql """show column stats part"""
assertEquals(0, result.size())
result = sql """show table stats part"""
Expand Down

0 comments on commit cd9e5bb

Please sign in to comment.