Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KYLIN-5748 Skip build flat table for index build job if all new created indexes can be built from existed parent index #2162

Open
wants to merge 3 commits into
base: kylin5
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public interface NBatchConstants {
String P_PARTITION_IDS = "partitionIds";
String P_BUCKETS = "buckets";

String P_IS_INDEX_BUILD = "isIndexBuild";
String P_INCREMENTAL_BUILD = "incrementalBuild";
String P_SELECTED_PARTITION_COL = "selectedPartitionCol";
String P_SELECTED_PARTITION_VALUE = "selectedPartition";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,9 @@ private static NSparkCubingJob innerCreate(JobFactory.JobBuildParams params) {
job.setParam(NBatchConstants.P_SEGMENT_IDS, String.join(",", job.getTargetSegments()));
job.setParam(NBatchConstants.P_DATA_RANGE_START, String.valueOf(startTime));
job.setParam(NBatchConstants.P_DATA_RANGE_END, String.valueOf(endTime));
if (isIndexBuildJob(jobType)) {
job.setParam(NBatchConstants.P_IS_INDEX_BUILD, String.valueOf(true));
}
if (CollectionUtils.isNotEmpty(ignoredSnapshotTables)) {
job.setParam(NBatchConstants.P_IGNORED_SNAPSHOT_TABLES, String.join(",", ignoredSnapshotTables));
}
Expand Down Expand Up @@ -323,6 +326,10 @@ public SparkCleanupTransactionalTableStep getCleanIntermediateTableStep() {
return getTask(SparkCleanupTransactionalTableStep.class);
}

private static boolean isIndexBuildJob(JobTypeEnum jobType) {
return JobTypeEnum.INDEX_BUILD.equals(jobType);
}

@Override
public void cancelJob() {
NDataflowManager nDataflowManager = NDataflowManager.getInstance(getConfig(), getProject());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.kylin.guava30.shaded.common.base.Throwables;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.cube.cuboid.AdaptiveSpanningTree;
import org.apache.kylin.metadata.cube.model.NBatchConstants;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
Expand Down Expand Up @@ -62,6 +63,7 @@
public class SegmentBuildJob extends SegmentJob {

private boolean usePlanner = false;
private boolean isIndexBuild = false;

public static void main(String[] args) {
SegmentBuildJob segmentBuildJob = new SegmentBuildJob();
Expand All @@ -75,6 +77,10 @@ protected final void extraInit() {
if (enablePlanner != null && Boolean.valueOf(enablePlanner)) {
usePlanner = true;
}
String isIndexBuildJob = getParam(NBatchConstants.P_IS_INDEX_BUILD);
if (isIndexBuildJob != null && Boolean.valueOf(isIndexBuildJob)) {
isIndexBuild = true;
}
}

@Override
Expand Down Expand Up @@ -145,6 +151,21 @@ protected void build() throws IOException {

val buildParam = new BuildParam();
MATERIALIZED_FACT_TABLE.createStage(this, seg, buildParam, exec);
if (isIndexBuild) {
if (Objects.isNull(buildParam.getBuildFlatTable())) {
val spanTree = new AdaptiveSpanningTree(config,
new AdaptiveSpanningTree.AdaptiveTreeBuilder(seg, this.getReadOnlyLayouts()));
buildParam.setSpanningTree(spanTree);
}
if (!buildParam.getSpanningTree().fromFlatTable()) {
log.info("this is an index build job for segment " +
seg.getId() +
" and all new created indexes will be built from parent index, " +
"will skip build dict and generate flat table");
buildParam.setSkipBuildDict(true);
buildParam.setSkipGenerateFlatTable(true);
}
}
BUILD_DICT.createStage(this, seg, buildParam, exec);
GENERATE_FLAT_TABLE.createStage(this, seg, buildParam, exec);
// enable cost based planner according to the parameter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class BuildParam {
private var cachedPartitionStats: Map[Long, Statistics] =
immutable.Map.newBuilder[Long, Statistics].result()

private var skipBuildDict: Boolean = _
private var skipGenerateFlatTable: Boolean = _
private var skipMaterializedFactTableView: Boolean = _

Expand All @@ -73,6 +74,12 @@ class BuildParam {
this.skipMaterializedFactTableView = skipMaterializedFactTableView
}

def isSkipBuildDict: Boolean = skipBuildDict

def setSkipBuildDict(skipBuildDict: Boolean): Unit = {
this.skipBuildDict = skipBuildDict
}

def isSkipGenerateFlatTable: Boolean = skipGenerateFlatTable

def setSkipGenerateFlatTable(skipGenerateFlatTable: Boolean): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,9 @@ class BuildDict(jobContext: SegmentJob, dataSegment: NDataSegment, buildParam: B
buildParam.setDict(dict)
}

if (buildParam.isSkipBuildDict) {
onStageSkipped()
}

override def getStageName: String = "BuildDict"
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ class PartitionBuildDict(jobContext: SegmentJob, dataSegment: NDataSegment, buil
override def execute(): Unit = {
val dict: Dataset[Row] = buildDictIfNeed()
buildParam.setDict(dict)
}

if (buildParam.isSkipBuildDict) {
onStageSkipped()
}
}
override def getStageName: String = "PartitionBuildDict"
}