Skip to content

Commit

Permalink
[Refactor] support incremental scan ranges deployment (#50189)
Browse files Browse the repository at this point in the history
Signed-off-by: yanz <[email protected]>
  • Loading branch information
dirtysalt committed Sep 4, 2024
1 parent 4eff9df commit 234d20a
Show file tree
Hide file tree
Showing 20 changed files with 514 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,15 @@ private void setupCloudCredential() {

@Override
public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) {
return scanRangeSource.getAllOutputs();
if (maxScanRangeLength == 0) {
return scanRangeSource.getAllOutputs();
}
return scanRangeSource.getOutputs((int) maxScanRangeLength);
}

@Override
public boolean hasMoreScanRanges() {
return scanRangeSource.hasMoreOutput();
}

@Override
Expand Down
4 changes: 4 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ public boolean isLocalNativeTable() {
return false;
}

public boolean hasMoreScanRanges() {
return false;
}

/**
* cast expr to SlotDescriptor type
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.starrocks.qe.scheduler.assignment.FragmentAssignmentStrategyFactory;
import com.starrocks.qe.scheduler.dag.ExecutionDAG;
import com.starrocks.qe.scheduler.dag.ExecutionFragment;
import com.starrocks.qe.scheduler.dag.FragmentInstance;
import com.starrocks.qe.scheduler.dag.JobSpec;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
Expand Down Expand Up @@ -258,6 +259,14 @@ void computeFragmentInstances() throws UserException {
executionDAG.finalizeDAG();
}

public void assignIncrementalScanRangesToFragmentInstances(ExecutionFragment execFragment) throws UserException {
execFragment.getScanRangeAssignment().clear();
for (FragmentInstance instance : execFragment.getInstances()) {
instance.getNode2ScanRanges().clear();
}
fragmentAssignmentStrategyFactory.create(execFragment, workerProvider).assignFragmentToWorker(execFragment);
}

private void validateExecutionDAG() throws StarRocksPlannerException {
for (ExecutionFragment execFragment : executionDAG.getFragmentsInPreorder()) {
DataSink sink = execFragment.getPlanFragment().getSink();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.starrocks.datacache.DataCacheSelectMetrics;
import com.starrocks.mysql.MysqlCommand;
import com.starrocks.planner.PlanFragment;
import com.starrocks.planner.PlanFragmentId;
import com.starrocks.planner.ResultSink;
import com.starrocks.planner.RuntimeFilterDescription;
import com.starrocks.planner.ScanNode;
Expand All @@ -74,6 +75,7 @@
import com.starrocks.qe.scheduler.dag.FragmentInstanceExecState;
import com.starrocks.qe.scheduler.dag.JobSpec;
import com.starrocks.qe.scheduler.dag.PhasedExecutionSchedule;
import com.starrocks.qe.scheduler.slot.DeployState;
import com.starrocks.qe.scheduler.slot.LogicalSlot;
import com.starrocks.rpc.RpcException;
import com.starrocks.server.GlobalStateMgr;
Expand All @@ -82,6 +84,7 @@
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.system.ComputeNode;
import com.starrocks.thrift.TDescriptorTable;
import com.starrocks.thrift.TExecPlanFragmentParams;
import com.starrocks.thrift.TLoadJobType;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TQueryType;
Expand All @@ -101,6 +104,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -478,6 +482,12 @@ public void prepareExec() throws Exception {
jobSpec.getFragments().forEach(fragment -> fragment.limitMaxPipelineDop(slot.getPipelineDop()));
}

if (connectContext != null) {
if (connectContext.getSessionVariable().isEnableConnectorIncrementalScanRanges()) {
jobSpec.setIncrementalScanRanges(true);
}
}

coordinatorPreprocessor.prepareExec();

prepareResultSink();
Expand Down Expand Up @@ -636,14 +646,69 @@ private void deliverExecFragments(boolean needDeploy) throws RpcException, UserE
Deployer deployer =
new Deployer(connectContext, jobSpec, executionDAG, coordinatorPreprocessor.getCoordAddress(),
this::handleErrorExecution, needDeploy);
schedule.prepareSchedule(deployer, executionDAG);
schedule.prepareSchedule(this, deployer, executionDAG);
this.schedule.schedule();
queryProfile.attachExecutionProfiles(executionDAG.getExecutions());
} finally {
unlock();
}
}

@Override
public List<DeployState> assignIncrementalScanRangesToDeployStates(Deployer deployer, List<DeployState> deployStates)
throws UserException {
List<DeployState> updatedStates = new ArrayList<>();
if (!jobSpec.isIncrementalScanRanges()) {
return updatedStates;
}
for (DeployState state : deployStates) {

Set<PlanFragmentId> planFragmentIds = new HashSet<>();
for (List<FragmentInstanceExecState> fragmentInstanceExecStates : state.getThreeStageExecutionsToDeploy()) {
for (FragmentInstanceExecState execState : fragmentInstanceExecStates) {
planFragmentIds.add(execState.getFragmentId());
}
}

Set<PlanFragmentId> updatedPlanFragmentIds = new HashSet<>();
for (PlanFragmentId fragmentId : planFragmentIds) {
boolean hasMoreScanRanges = false;
ExecutionFragment fragment = executionDAG.getFragment(fragmentId);
for (ScanNode scanNode : fragment.getScanNodes()) {
if (scanNode.hasMoreScanRanges()) {
hasMoreScanRanges = true;
}
}
if (hasMoreScanRanges) {
coordinatorPreprocessor.assignIncrementalScanRangesToFragmentInstances(fragment);
updatedPlanFragmentIds.add(fragmentId);
}
}

if (updatedPlanFragmentIds.isEmpty()) {
continue;
}

DeployState newState = new DeployState();
updatedStates.add(newState);
int index = 0;
for (List<FragmentInstanceExecState> fragmentInstanceExecStates : state.getThreeStageExecutionsToDeploy()) {
List<FragmentInstanceExecState> res = newState.getThreeStageExecutionsToDeploy().get(index);
index += 1;
for (FragmentInstanceExecState execState : fragmentInstanceExecStates) {
if (!updatedPlanFragmentIds.contains(execState.getFragmentId())) {
continue;
}
FragmentInstance instance = execState.getFragmentInstance();
TExecPlanFragmentParams request = deployer.createIncrementalScanRangesRequest(instance);
execState.setRequestToDeploy(request);
res.add(execState);
}
}
}
return updatedStates;
}

private void handleErrorExecution(Status status, FragmentInstanceExecState execution, Throwable failure)
throws UserException, RpcException {
cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR);
Expand Down
80 changes: 52 additions & 28 deletions fe/fe-core/src/main/java/com/starrocks/qe/HDFSBackendSelector.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.starrocks.sql.plan.HDFSScanNodePredicates;
import com.starrocks.system.ComputeNode;
import com.starrocks.thrift.THdfsScanRange;
import com.starrocks.thrift.TScanRange;
import com.starrocks.thrift.TScanRangeLocation;
import com.starrocks.thrift.TScanRangeLocations;
import com.starrocks.thrift.TScanRangeParams;
Expand Down Expand Up @@ -75,14 +76,13 @@ public class HDFSBackendSelector implements BackendSelector {
Map<ComputeNode, Long> assignedScansPerComputeNode = Maps.newHashMap();
// be -> re-balance bytes
Map<ComputeNode, Long> reBalanceBytesPerComputeNode = Maps.newHashMap();
// be host -> bes
Multimap<String, ComputeNode> hostToBackends = HashMultimap.create();
private final ScanNode scanNode;
private final List<TScanRangeLocations> locations;
private final FragmentScanRangeAssignment assignment;
private final WorkerProvider workerProvider;
private final boolean forceScheduleLocal;
private final boolean shuffleScanRange;
private final boolean useIncrementalScanRanges;
private final int kCandidateNumber = 3;
// After testing, this value can ensure that the scan range size assigned to each BE is as uniform as possible,
// and the largest scan data is not more than 1.1 times of the average value
Expand Down Expand Up @@ -152,14 +152,16 @@ public void acceptScanRangeLocations(TScanRangeLocations tScanRangeLocations, Pr
public HDFSBackendSelector(ScanNode scanNode, List<TScanRangeLocations> locations,
FragmentScanRangeAssignment assignment, WorkerProvider workerProvider,
boolean forceScheduleLocal,
boolean shuffleScanRange) {
boolean shuffleScanRange,
boolean useIncrementalScanRanges) {
this.scanNode = scanNode;
this.locations = locations;
this.assignment = assignment;
this.workerProvider = workerProvider;
this.forceScheduleLocal = forceScheduleLocal;
this.hdfsScanRangeHasher = new HdfsScanRangeHasher();
this.shuffleScanRange = shuffleScanRange;
this.useIncrementalScanRanges = useIncrementalScanRanges;
}

// re-balance scan ranges for compute node if needed, return the compute node which scan range is assigned to
Expand Down Expand Up @@ -227,44 +229,66 @@ private long computeTotalSize() {

@Override
public void computeScanRangeAssignment() throws UserException {
computeGeneralAssignment();
if (useIncrementalScanRanges) {
boolean hasMore = scanNode.hasMoreScanRanges();
TScanRangeParams end = new TScanRangeParams();
end.setScan_range(new TScanRange());
end.setPlaceholder(true);
end.setHas_more(hasMore);
for (ComputeNode computeNode : workerProvider.getAllWorkers()) {
assignment.put(computeNode.getId(), scanNode.getId().asInt(), end);
}
}
}

private List<TScanRangeLocations> computeForceScheduleLocalAssignment(long avgNodeScanRangeBytes) throws UserException {
// be host -> bes
Multimap<String, ComputeNode> hostToBackends = HashMultimap.create();
for (ComputeNode computeNode : workerProvider.getAllWorkers()) {
hostToBackends.put(computeNode.getHost(), computeNode);
}

List<TScanRangeLocations> unassigned = Lists.newArrayList();
for (int i = 0; i < locations.size(); ++i) {
TScanRangeLocations scanRangeLocations = locations.get(i);
List<ComputeNode> backends = new ArrayList<>();
// select all backends that are co-located with this scan range.
for (final TScanRangeLocation location : scanRangeLocations.getLocations()) {
Collection<ComputeNode> servers = hostToBackends.get(location.getServer().getHostname());
if (servers.isEmpty()) {
continue;
}
backends.addAll(servers);
}
ComputeNode node =
reBalanceScanRangeForComputeNode(backends, avgNodeScanRangeBytes, scanRangeLocations);
if (node == null) {
unassigned.add(scanRangeLocations);
} else {
recordScanRangeAssignment(node, backends, scanRangeLocations);
}
}
return unassigned;
}

private void computeGeneralAssignment() throws UserException {
if (locations.size() == 0) {
return;
}

long totalSize = computeTotalSize();
long avgNodeScanRangeBytes = totalSize / Math.max(workerProvider.getAllWorkers().size(), 1) + 1;

for (ComputeNode computeNode : workerProvider.getAllWorkers()) {
assignedScansPerComputeNode.put(computeNode, 0L);
reBalanceBytesPerComputeNode.put(computeNode, 0L);
hostToBackends.put(computeNode.getHost(), computeNode);
}

// schedule scan ranges to co-located backends.
// and put rest scan ranges into remote scan ranges.
List<TScanRangeLocations> remoteScanRangeLocations = Lists.newArrayList();
List<TScanRangeLocations> remoteScanRangeLocations = locations;
if (forceScheduleLocal) {
for (int i = 0; i < locations.size(); ++i) {
TScanRangeLocations scanRangeLocations = locations.get(i);
List<ComputeNode> backends = new ArrayList<>();
// select all backends that are co-located with this scan range.
for (final TScanRangeLocation location : scanRangeLocations.getLocations()) {
Collection<ComputeNode> servers = hostToBackends.get(location.getServer().getHostname());
if (servers == null || servers.isEmpty()) {
continue;
}
backends.addAll(servers);
}
ComputeNode node =
reBalanceScanRangeForComputeNode(backends, avgNodeScanRangeBytes, scanRangeLocations);
if (node == null) {
remoteScanRangeLocations.add(scanRangeLocations);
} else {
recordScanRangeAssignment(node, backends, scanRangeLocations);
}
}
} else {
remoteScanRangeLocations = locations;
remoteScanRangeLocations = computeForceScheduleLocalAssignment(avgNodeScanRangeBytes);
}
if (remoteScanRangeLocations.isEmpty()) {
return;
Expand All @@ -281,7 +305,7 @@ public void computeScanRangeAssignment() throws UserException {
List<ComputeNode> backends = hashRing.get(scanRangeLocations, kCandidateNumber);
ComputeNode node = reBalanceScanRangeForComputeNode(backends, avgNodeScanRangeBytes, scanRangeLocations);
if (node == null) {
throw new RuntimeException("Failed to find backend to execute");
throw new UserException("Failed to find backend to execute");
}
recordScanRangeAssignment(node, backends, scanRangeLocations);
}
Expand Down
24 changes: 24 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -784,6 +784,8 @@ public static MaterializedViewRewriteMode parse(String str) {

public static final String CONNECTOR_REMOTE_FILE_ASYNC_QUEUE_SIZE = "connector_remote_file_async_queue_size";
public static final String CONNECTOR_REMOTE_FILE_ASYNC_TASK_SIZE = "connector_remote_file_async_task_size";
public static final String ENABLE_CONNECTOR_INCREMENTAL_SCAN_RANGES = "enable_connector_incremental_scan_ranges";
public static final String CONNECTOR_INCREMENTAL_SCAN_RANGE_SIZE = "connector_incremental_scan_ranges_size";

public static final List<String> DEPRECATED_VARIABLES = ImmutableList.<String>builder()
.add(CODEGEN_LEVEL)
Expand Down Expand Up @@ -2135,6 +2137,12 @@ public SessionVariable setHiveTempStagingDir(String hiveTempStagingDir) {
@VarAttr(name = CONNECTOR_REMOTE_FILE_ASYNC_TASK_SIZE, flag = VariableMgr.INVISIBLE)
private int connectorRemoteFileAsyncTaskSize = 4;

@VarAttr(name = ENABLE_CONNECTOR_INCREMENTAL_SCAN_RANGES)
private boolean enableConnectorIncrementalScanRanges = false;

@VarAttr(name = CONNECTOR_INCREMENTAL_SCAN_RANGE_SIZE)
private int connectorIncrementalScanRangeSize = 1000;

public SessionVariableConstants.ChooseInstancesMode getChooseExecuteInstancesMode() {
return Enums.getIfPresent(SessionVariableConstants.ChooseInstancesMode.class,
StringUtils.upperCase(chooseExecuteInstancesMode))
Expand Down Expand Up @@ -4134,6 +4142,22 @@ public int getConnectorRemoteFileAsyncTaskSize() {
return connectorRemoteFileAsyncTaskSize;
}

public int getConnectorIncrementalScanRangeNumber() {
return connectorIncrementalScanRangeSize;
}

public void setConnectorIncrementalScanRangeNumber(int v) {
connectorIncrementalScanRangeSize = v;
}

public boolean isEnableConnectorIncrementalScanRanges() {
return enableConnectorIncrementalScanRanges;
}

public void setEnableConnectorIncrementalScanRanges(boolean v) {
enableConnectorIncrementalScanRanges = v;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.starrocks.analysis.DescriptorTable;
import com.starrocks.common.Status;
import com.starrocks.common.UserException;
import com.starrocks.common.util.RuntimeProfile;
import com.starrocks.datacache.DataCacheSelectMetrics;
import com.starrocks.planner.PlanFragment;
Expand All @@ -26,6 +27,7 @@
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.QueryStatisticsItem;
import com.starrocks.qe.RowBatch;
import com.starrocks.qe.scheduler.slot.DeployState;
import com.starrocks.qe.scheduler.slot.LogicalSlot;
import com.starrocks.sql.LoadPlanner;
import com.starrocks.sql.plan.ExecPlan;
Expand Down Expand Up @@ -80,8 +82,8 @@ Coordinator createBrokerExportScheduler(Long jobId, TUniqueId queryId, Descripto
long warehouseId);

Coordinator createRefreshDictionaryCacheScheduler(ConnectContext context, TUniqueId queryId,
DescriptorTable descTable, List<PlanFragment> fragments,
List<ScanNode> scanNodes);
DescriptorTable descTable, List<PlanFragment> fragments,
List<ScanNode> scanNodes);
}

// ------------------------------------------------------------------------------------
Expand Down Expand Up @@ -129,6 +131,11 @@ public void cancel(String cancelledMessage) {

public abstract void cancel(PPlanFragmentCancelReason reason, String message);

public List<DeployState> assignIncrementalScanRangesToDeployStates(Deployer deployer, List<DeployState> deployStates)
throws UserException {
return List.of();
}

public abstract void onFinished();

public abstract LogicalSlot getSlot();
Expand Down
Loading

0 comments on commit 234d20a

Please sign in to comment.