Skip to content

Commit

Permalink
[Enhancement] List Partition For AMV(Part 5): Support partial refresh…
Browse files Browse the repository at this point in the history
… list partition for mv (backport #50969) (#51139)

Signed-off-by: shuming.li <[email protected]>
Co-authored-by: shuming.li <[email protected]>
  • Loading branch information
mergify[bot] and LiShuMing authored Sep 19, 2024
1 parent 2739a06 commit fa22ef6
Show file tree
Hide file tree
Showing 32 changed files with 1,864 additions and 234 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public boolean containsToMergeProperties() {
return false;
}
if (containsKey(TaskRun.PARTITION_START) || containsKey(TaskRun.PARTITION_END)
|| containsKey(TaskRun.START_TASK_RUN_ID)) {
|| containsKey(TaskRun.START_TASK_RUN_ID) || containsKey(TaskRun.PARTITION_VALUES)) {
return true;
}
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ public class MvTaskRunContext extends TaskRunContext {

private String nextPartitionStart = null;
private String nextPartitionEnd = null;
// The next list partition values to be processed
private String nextPartitionValues = null;
private ExecPlan execPlan = null;

private int partitionTTLNumber = TableProperty.INVALID;
Expand Down Expand Up @@ -74,7 +76,7 @@ public void setMvRefBaseTableIntersectedPartitions(
}

public boolean hasNextBatchPartition() {
return nextPartitionStart != null && nextPartitionEnd != null;
return (nextPartitionStart != null && nextPartitionEnd != null) || (nextPartitionValues != null);
}

public String getNextPartitionStart() {
Expand All @@ -93,6 +95,14 @@ public void setNextPartitionEnd(String nextPartitionEnd) {
this.nextPartitionEnd = nextPartitionEnd;
}

public String getNextPartitionValues() {
return nextPartitionValues;
}

public void setNextPartitionValues(String nextPartitionValues) {
this.nextPartitionValues = nextPartitionValues;
}

public Map<Table, Map<String, Range<PartitionKey>>> getRefBaseTableRangePartitionMap() {
return refBaseTableRangePartitionMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
import com.starrocks.common.MaterializedViewExceptions;
import com.starrocks.common.Pair;
import com.starrocks.common.UserException;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
Expand All @@ -62,6 +61,7 @@
import com.starrocks.scheduler.mv.MVPCTRefreshPartitioner;
import com.starrocks.scheduler.mv.MVPCTRefreshPlanBuilder;
import com.starrocks.scheduler.mv.MVPCTRefreshRangePartitioner;
import com.starrocks.scheduler.mv.MVRefreshParams;
import com.starrocks.scheduler.mv.MVTraceUtils;
import com.starrocks.scheduler.mv.MVVersionManager;
import com.starrocks.scheduler.persist.MVTaskRunExtraMessage;
Expand Down Expand Up @@ -230,6 +230,7 @@ private boolean syncAndCheckPartitions(TaskRunContext context,
Map<TableSnapshotInfo, Set<String>> baseTableCandidatePartitions)
throws AnalysisException, LockTimeoutException {
// collect partition infos of ref base tables
LOG.debug("Start to sync and check partitions for mv: {}", materializedView.getName());
int retryNum = 0;
boolean checked = false;
Stopwatch stopwatch = Stopwatch.createStarted();
Expand Down Expand Up @@ -263,7 +264,7 @@ private boolean syncAndCheckPartitions(TaskRunContext context,
}
Tracers.record("MVRefreshSyncPartitionsRetryTimes", String.valueOf(retryNum));

LOG.info("materialized view {} after checking partitions change {} times: {}, costs: {} ms",
LOG.info("Sync and check materialized view {} partition changing after {} times: {}, costs: {} ms",
materializedView.getName(), retryNum, checked, stopwatch.elapsed(TimeUnit.MILLISECONDS));
return checked;
}
Expand Down Expand Up @@ -297,9 +298,10 @@ private Set<String> checkMvToRefreshedPartitions(TaskRunContext context, boolean
tentative);
int partitionRefreshNumber = materializedView.getTableProperty().getPartitionRefreshNumber();
LOG.info("Filter partitions to refresh for materialized view {}, partitionRefreshNumber={}, " +
"partitionsToRefresh:{}, mvPotentialPartitionNames:{}, next start:{}, next end:{}",
"partitionsToRefresh:{}, mvPotentialPartitionNames:{}, next start:{}, next end:{}, " +
"next list values:{}",
materializedView.getName(), partitionRefreshNumber, mvToRefreshedPartitions, mvPotentialPartitionNames,
mvContext.getNextPartitionStart(), mvContext.getNextPartitionEnd());
mvContext.getNextPartitionStart(), mvContext.getNextPartitionEnd(), mvContext.getNextPartitionValues());
} finally {
locker.unLockTableWithIntensiveDbLock(db, materializedView, LockType.READ);
}
Expand Down Expand Up @@ -352,7 +354,7 @@ private RefreshJobStatus doRefreshMaterializedViewWithRetry(TaskRunContext taskR
IMaterializedViewMetricsEntity mvEntity) throws DmlException {
// Use current connection variables instead of mvContext's session variables to be better debug.
int maxRefreshMaterializedViewRetryNum = getMaxRefreshMaterializedViewRetryNum(taskRunContext.getCtx());
LOG.info("Start to refresh mv:{} with retry times:{}, try lock failure retry times:{}",
LOG.info("Start to refresh mv:{} with retry times:{}",
materializedView.getName(), maxRefreshMaterializedViewRetryNum);

Throwable lastException = null;
Expand Down Expand Up @@ -443,7 +445,8 @@ private RefreshJobStatus doRefreshMaterializedView(TaskRunContext context,
// ref table of materialized view : refreshed partition names
refTablePartitionNames = refTableRefreshPartitions.entrySet().stream()
.collect(Collectors.toMap(x -> x.getKey().getName(), Map.Entry::getValue));
LOG.info("materialized:{}, mvToRefreshedPartitions:{}, refTableRefreshPartitions:{}",
LOG.info("Check mv {} to refresh partitions: mvToRefreshedPartitions:{}, " +
"refTableRefreshPartitions:{}",
materializedView.getName(), mvToRefreshedPartitions, refTableRefreshPartitions);
// add a message into information_schema
logMvToRefreshInfoIntoTaskRun(mvToRefreshedPartitions, refTablePartitionNames);
Expand Down Expand Up @@ -515,6 +518,16 @@ private InsertStmt prepareRefreshPlan(Set<String> mvToRefreshedPartitions, Map<S
locker.unlock();
}

updateTaskRunStatus(status -> {
MVTaskRunExtraMessage message = status.getMvTaskRunExtraMessage();
if (message == null) {
return;
}
Map<String, String> planBuildMessage = planBuilder.getPlanBuilderMessage();
LOG.info("MV Refresh PlanBuilderMessage: {}", planBuildMessage);
message.setPlanBuilderMessage(planBuildMessage);
});

QueryDebugOptions debugOptions = ctx.getSessionVariable().getQueryDebugOptions();
// log the final mv refresh plan for each refresh for better trace and debug
if (LOG.isDebugEnabled() || debugOptions.isEnableQueryTraceLog()) {
Expand All @@ -531,6 +544,7 @@ private InsertStmt prepareRefreshPlan(Set<String> mvToRefreshedPartitions, Map<S
materializedView.getName(), String.join(",", mvToRefreshedPartitions), refTablePartitionNames);
}
mvContext.setExecPlan(execPlan);
LOG.info("Finished mv refresh plan for {}", materializedView.getName());
return insertStmt;
}

Expand Down Expand Up @@ -663,15 +677,22 @@ private void generateNextTaskRun() {
newProperties.put(proEntry.getKey(), proEntry.getValue());
}
}
newProperties.put(TaskRun.PARTITION_START, mvContext.getNextPartitionStart());
newProperties.put(TaskRun.PARTITION_END, mvContext.getNextPartitionEnd());
PartitionInfo partitionInfo = materializedView.getPartitionInfo();
if (partitionInfo.isListPartition()) {
//TODO: partition values may be too long, need to be optimized later.
newProperties.put(TaskRun.PARTITION_VALUES, mvContext.getNextPartitionValues());
} else {
newProperties.put(TaskRun.PARTITION_START, mvContext.getNextPartitionStart());
newProperties.put(TaskRun.PARTITION_END, mvContext.getNextPartitionEnd());
}
if (mvContext.getStatus() != null) {
newProperties.put(TaskRun.START_TASK_RUN_ID, mvContext.getStatus().getStartTaskRunId());
}
updateTaskRunStatus(status -> {
MVTaskRunExtraMessage extraMessage = status.getMvTaskRunExtraMessage();
extraMessage.setNextPartitionStart(mvContext.getNextPartitionStart());
extraMessage.setNextPartitionEnd(mvContext.getNextPartitionEnd());
extraMessage.setNextPartitionValues(mvContext.getNextPartitionValues());
});

// Partition refreshing task run should have the HIGHEST priority, and be scheduled before other tasks
Expand Down Expand Up @@ -699,7 +720,6 @@ private void refreshExternalTable(TaskRunContext context,
List<BaseTableInfo> baseTableInfos = materializedView.getBaseTableInfos();
// use it if refresh external table fails
ConnectContext connectContext = context.getCtx();
List<Pair<Table, BaseTableInfo>> toRepairTables = new ArrayList<>();
for (BaseTableInfo baseTableInfo : baseTableInfos) {
Optional<Database> dbOpt = GlobalStateMgr.getCurrentState().getMetadataMgr().getDatabase(baseTableInfo);
if (dbOpt.isEmpty()) {
Expand All @@ -719,7 +739,7 @@ private void refreshExternalTable(TaskRunContext context,

// refresh old table
if (table.isNativeTableOrMaterializedView() || table.isHiveView()) {
LOG.info("No need to refresh table:{} because it is native table or materialized view or connector view",
LOG.debug("No need to refresh table:{} because it is native table or materialized view or connector view",
baseTableInfo.getTableInfoStr());
continue;
}
Expand Down Expand Up @@ -890,7 +910,7 @@ private boolean syncPartitions() throws AnalysisException, LockTimeoutException

// do sync partitions (add or drop partitions) for materialized view
boolean result = mvRefreshPartitioner.syncAddOrDropPartitions();
LOG.info("finish sync partitions. mv:{}, cost(ms): {}", materializedView.getName(),
LOG.info("Finish sync mv:{} partitions, cost(ms): {}", materializedView.getName(),
stopwatch.elapsed(TimeUnit.MILLISECONDS));
return result;
}
Expand All @@ -903,20 +923,18 @@ public Set<String> getPartitionsToRefreshForMaterializedView(Map<String, String>
Set<String> mvPotentialPartitionNames,
boolean tentative)
throws AnalysisException {
String start = properties.get(TaskRun.PARTITION_START);
String end = properties.get(TaskRun.PARTITION_END);
boolean force = Boolean.parseBoolean(properties.get(TaskRun.FORCE));
PartitionInfo partitionInfo = materializedView.getPartitionInfo();
MVRefreshParams mvRefreshParams = new MVRefreshParams(partitionInfo, properties, tentative);
Set<String> needRefreshMvPartitionNames = getPartitionsToRefreshForMaterializedView(partitionInfo,
start, end, tentative || force, mvPotentialPartitionNames);
mvRefreshParams, mvPotentialPartitionNames);

// update mv extra message
if (!tentative) {
updateTaskRunStatus(status -> {
MVTaskRunExtraMessage extraMessage = status.getMvTaskRunExtraMessage();
extraMessage.setForceRefresh(force);
extraMessage.setPartitionStart(start);
extraMessage.setPartitionEnd(end);
extraMessage.setForceRefresh(mvRefreshParams.isForce());
extraMessage.setPartitionStart(mvRefreshParams.getRangeStart());
extraMessage.setPartitionEnd(mvRefreshParams.getRangeEnd());
});
}

Expand All @@ -936,25 +954,21 @@ private void updateTaskRunStatus(Consumer<TaskRunStatus> action) {

/**
* @param mvPartitionInfo : materialized view's partition info
* @param start : materialized view's refresh start in this task run
* @param end : materialized view's refresh end in this task run
* @param force : whether this task run is force or not
* @param mvRefreshParams : materialized view's refresh params
* @return the partitions to refresh for materialized view
* @throws AnalysisException
*/
private Set<String> getPartitionsToRefreshForMaterializedView(
PartitionInfo mvPartitionInfo,
String start,
String end,
boolean force,
Set<String> mvPotentialPartitionNames) throws AnalysisException {
if (force && start == null && end == null) {
private Set<String> getPartitionsToRefreshForMaterializedView(PartitionInfo mvPartitionInfo,
MVRefreshParams mvRefreshParams,
Set<String> mvPotentialPartitionNames)
throws AnalysisException {
if (mvRefreshParams.isForceCompleteRefresh()) {
// Force refresh
int partitionTTLNumber = mvContext.getPartitionTTLNumber();
return mvRefreshPartitioner.getMVPartitionsToRefreshWithForce(partitionTTLNumber);
} else {
return mvRefreshPartitioner.getMVPartitionsToRefresh(mvPartitionInfo, snapshotBaseTables,
start, end, force, mvPotentialPartitionNames);
mvRefreshParams, mvPotentialPartitionNames);
}
}

Expand Down
8 changes: 4 additions & 4 deletions fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRun.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ public class TaskRun implements Comparable<TaskRun> {
public static final String MV_ID = "mvId";
public static final String PARTITION_START = "PARTITION_START";
public static final String PARTITION_END = "PARTITION_END";
// list partition values to be refreshed
public static final String PARTITION_VALUES = "PARTITION_VALUES";
public static final String FORCE = "FORCE";
public static final String START_TASK_RUN_ID = "START_TASK_RUN_ID";
// All properties that can be set in TaskRun
public static final Set<String> TASK_RUN_PROPERTIES =
ImmutableSet.of(
MV_ID, PARTITION_START, PARTITION_END,
FORCE, START_TASK_RUN_ID);
public static final Set<String> TASK_RUN_PROPERTIES = ImmutableSet.of(
MV_ID, PARTITION_START, PARTITION_END, FORCE, START_TASK_RUN_ID, PARTITION_VALUES);

// Only used in FE's UT
public static final String IS_TEST = "__IS_TEST__";
Expand Down
Loading

0 comments on commit fa22ef6

Please sign in to comment.