From 234d20afe0cbdc8f43fb6e679be649a8e584c96a Mon Sep 17 00:00:00 2001 From: RyanZ Date: Wed, 4 Sep 2024 10:16:44 +0800 Subject: [PATCH] [Refactor] support incremental scan ranges deployment (#50189) Signed-off-by: yanz --- .../com/starrocks/planner/HdfsScanNode.java | 10 +- .../java/com/starrocks/planner/ScanNode.java | 4 + .../starrocks/qe/CoordinatorPreprocessor.java | 9 ++ .../com/starrocks/qe/DefaultCoordinator.java | 67 +++++++- .../com/starrocks/qe/HDFSBackendSelector.java | 80 ++++++---- .../com/starrocks/qe/SessionVariable.java | 24 +++ .../starrocks/qe/scheduler/Coordinator.java | 11 +- .../com/starrocks/qe/scheduler/Deployer.java | 5 + .../scheduler/TFragmentInstanceFactory.java | 12 ++ .../assignment/BackendSelectorFactory.java | 18 ++- .../FragmentAssignmentStrategyFactory.java | 2 +- .../LocalFragmentAssignmentStrategy.java | 35 ++-- .../dag/AllAtOnceExecutionSchedule.java | 18 ++- .../qe/scheduler/dag/ExecutionSchedule.java | 3 +- .../dag/FragmentInstanceExecState.java | 19 ++- .../starrocks/qe/scheduler/dag/JobSpec.java | 12 ++ .../dag/PhasedExecutionSchedule.java | 19 ++- .../starrocks/qe/HDFSBackendSelectorTest.java | 74 ++++++++- .../scheduler/IncrementalDeployHiveTest.java | 149 ++++++++++++++++++ gensrc/thrift/InternalService.thrift | 2 + 20 files changed, 514 insertions(+), 59 deletions(-) create mode 100644 fe/fe-core/src/test/java/com/starrocks/qe/scheduler/IncrementalDeployHiveTest.java diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/HdfsScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/HdfsScanNode.java index 555d60b8e38a3..aabb36495b357 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/HdfsScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/HdfsScanNode.java @@ -106,7 +106,15 @@ private void setupCloudCredential() { @Override public List getScanRangeLocations(long maxScanRangeLength) { - return scanRangeSource.getAllOutputs(); + if (maxScanRangeLength == 0) { + return scanRangeSource.getAllOutputs(); + } + return scanRangeSource.getOutputs((int) maxScanRangeLength); + } + + @Override + public boolean hasMoreScanRanges() { + return scanRangeSource.hasMoreOutput(); } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java b/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java index e86214c837fd9..a706a9656c347 100644 --- a/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/com/starrocks/planner/ScanNode.java @@ -100,6 +100,10 @@ public boolean isLocalNativeTable() { return false; } + public boolean hasMoreScanRanges() { + return false; + } + /** * cast expr to SlotDescriptor type */ diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java b/fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java index dbe45251071e3..1d58d60024235 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/CoordinatorPreprocessor.java @@ -36,6 +36,7 @@ import com.starrocks.qe.scheduler.assignment.FragmentAssignmentStrategyFactory; import com.starrocks.qe.scheduler.dag.ExecutionDAG; import com.starrocks.qe.scheduler.dag.ExecutionFragment; +import com.starrocks.qe.scheduler.dag.FragmentInstance; import com.starrocks.qe.scheduler.dag.JobSpec; import com.starrocks.server.GlobalStateMgr; import com.starrocks.server.RunMode; @@ -258,6 +259,14 @@ void computeFragmentInstances() throws UserException { executionDAG.finalizeDAG(); } + public void assignIncrementalScanRangesToFragmentInstances(ExecutionFragment execFragment) throws UserException { + execFragment.getScanRangeAssignment().clear(); + for (FragmentInstance instance : execFragment.getInstances()) { + instance.getNode2ScanRanges().clear(); + } + fragmentAssignmentStrategyFactory.create(execFragment, workerProvider).assignFragmentToWorker(execFragment); + } + private void validateExecutionDAG() throws StarRocksPlannerException { for (ExecutionFragment execFragment : executionDAG.getFragmentsInPreorder()) { DataSink sink = execFragment.getPlanFragment().getSink(); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java b/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java index 466bdf878c269..78b16857ffa19 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/DefaultCoordinator.java @@ -56,6 +56,7 @@ import com.starrocks.datacache.DataCacheSelectMetrics; import com.starrocks.mysql.MysqlCommand; import com.starrocks.planner.PlanFragment; +import com.starrocks.planner.PlanFragmentId; import com.starrocks.planner.ResultSink; import com.starrocks.planner.RuntimeFilterDescription; import com.starrocks.planner.ScanNode; @@ -74,6 +75,7 @@ import com.starrocks.qe.scheduler.dag.FragmentInstanceExecState; import com.starrocks.qe.scheduler.dag.JobSpec; import com.starrocks.qe.scheduler.dag.PhasedExecutionSchedule; +import com.starrocks.qe.scheduler.slot.DeployState; import com.starrocks.qe.scheduler.slot.LogicalSlot; import com.starrocks.rpc.RpcException; import com.starrocks.server.GlobalStateMgr; @@ -82,6 +84,7 @@ import com.starrocks.sql.plan.ExecPlan; import com.starrocks.system.ComputeNode; import com.starrocks.thrift.TDescriptorTable; +import com.starrocks.thrift.TExecPlanFragmentParams; import com.starrocks.thrift.TLoadJobType; import com.starrocks.thrift.TNetworkAddress; import com.starrocks.thrift.TQueryType; @@ -101,6 +104,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; @@ -478,6 +482,12 @@ public void prepareExec() throws Exception { jobSpec.getFragments().forEach(fragment -> fragment.limitMaxPipelineDop(slot.getPipelineDop())); } + if (connectContext != null) { + if (connectContext.getSessionVariable().isEnableConnectorIncrementalScanRanges()) { + jobSpec.setIncrementalScanRanges(true); + } + } + coordinatorPreprocessor.prepareExec(); prepareResultSink(); @@ -636,7 +646,7 @@ private void deliverExecFragments(boolean needDeploy) throws RpcException, UserE Deployer deployer = new Deployer(connectContext, jobSpec, executionDAG, coordinatorPreprocessor.getCoordAddress(), this::handleErrorExecution, needDeploy); - schedule.prepareSchedule(deployer, executionDAG); + schedule.prepareSchedule(this, deployer, executionDAG); this.schedule.schedule(); queryProfile.attachExecutionProfiles(executionDAG.getExecutions()); } finally { @@ -644,6 +654,61 @@ private void deliverExecFragments(boolean needDeploy) throws RpcException, UserE } } + @Override + public List assignIncrementalScanRangesToDeployStates(Deployer deployer, List deployStates) + throws UserException { + List updatedStates = new ArrayList<>(); + if (!jobSpec.isIncrementalScanRanges()) { + return updatedStates; + } + for (DeployState state : deployStates) { + + Set planFragmentIds = new HashSet<>(); + for (List fragmentInstanceExecStates : state.getThreeStageExecutionsToDeploy()) { + for (FragmentInstanceExecState execState : fragmentInstanceExecStates) { + planFragmentIds.add(execState.getFragmentId()); + } + } + + Set updatedPlanFragmentIds = new HashSet<>(); + for (PlanFragmentId fragmentId : planFragmentIds) { + boolean hasMoreScanRanges = false; + ExecutionFragment fragment = executionDAG.getFragment(fragmentId); + for (ScanNode scanNode : fragment.getScanNodes()) { + if (scanNode.hasMoreScanRanges()) { + hasMoreScanRanges = true; + } + } + if (hasMoreScanRanges) { + coordinatorPreprocessor.assignIncrementalScanRangesToFragmentInstances(fragment); + updatedPlanFragmentIds.add(fragmentId); + } + } + + if (updatedPlanFragmentIds.isEmpty()) { + continue; + } + + DeployState newState = new DeployState(); + updatedStates.add(newState); + int index = 0; + for (List fragmentInstanceExecStates : state.getThreeStageExecutionsToDeploy()) { + List res = newState.getThreeStageExecutionsToDeploy().get(index); + index += 1; + for (FragmentInstanceExecState execState : fragmentInstanceExecStates) { + if (!updatedPlanFragmentIds.contains(execState.getFragmentId())) { + continue; + } + FragmentInstance instance = execState.getFragmentInstance(); + TExecPlanFragmentParams request = deployer.createIncrementalScanRangesRequest(instance); + execState.setRequestToDeploy(request); + res.add(execState); + } + } + } + return updatedStates; + } + private void handleErrorExecution(Status status, FragmentInstanceExecState execution, Throwable failure) throws UserException, RpcException { cancelInternal(PPlanFragmentCancelReason.INTERNAL_ERROR); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/HDFSBackendSelector.java b/fe/fe-core/src/main/java/com/starrocks/qe/HDFSBackendSelector.java index 7f320de155f4e..52a8e826ffdcb 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/HDFSBackendSelector.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/HDFSBackendSelector.java @@ -43,6 +43,7 @@ import com.starrocks.sql.plan.HDFSScanNodePredicates; import com.starrocks.system.ComputeNode; import com.starrocks.thrift.THdfsScanRange; +import com.starrocks.thrift.TScanRange; import com.starrocks.thrift.TScanRangeLocation; import com.starrocks.thrift.TScanRangeLocations; import com.starrocks.thrift.TScanRangeParams; @@ -75,14 +76,13 @@ public class HDFSBackendSelector implements BackendSelector { Map assignedScansPerComputeNode = Maps.newHashMap(); // be -> re-balance bytes Map reBalanceBytesPerComputeNode = Maps.newHashMap(); - // be host -> bes - Multimap hostToBackends = HashMultimap.create(); private final ScanNode scanNode; private final List locations; private final FragmentScanRangeAssignment assignment; private final WorkerProvider workerProvider; private final boolean forceScheduleLocal; private final boolean shuffleScanRange; + private final boolean useIncrementalScanRanges; private final int kCandidateNumber = 3; // After testing, this value can ensure that the scan range size assigned to each BE is as uniform as possible, // and the largest scan data is not more than 1.1 times of the average value @@ -152,7 +152,8 @@ public void acceptScanRangeLocations(TScanRangeLocations tScanRangeLocations, Pr public HDFSBackendSelector(ScanNode scanNode, List locations, FragmentScanRangeAssignment assignment, WorkerProvider workerProvider, boolean forceScheduleLocal, - boolean shuffleScanRange) { + boolean shuffleScanRange, + boolean useIncrementalScanRanges) { this.scanNode = scanNode; this.locations = locations; this.assignment = assignment; @@ -160,6 +161,7 @@ public HDFSBackendSelector(ScanNode scanNode, List location this.forceScheduleLocal = forceScheduleLocal; this.hdfsScanRangeHasher = new HdfsScanRangeHasher(); this.shuffleScanRange = shuffleScanRange; + this.useIncrementalScanRanges = useIncrementalScanRanges; } // re-balance scan ranges for compute node if needed, return the compute node which scan range is assigned to @@ -227,44 +229,66 @@ private long computeTotalSize() { @Override public void computeScanRangeAssignment() throws UserException { + computeGeneralAssignment(); + if (useIncrementalScanRanges) { + boolean hasMore = scanNode.hasMoreScanRanges(); + TScanRangeParams end = new TScanRangeParams(); + end.setScan_range(new TScanRange()); + end.setPlaceholder(true); + end.setHas_more(hasMore); + for (ComputeNode computeNode : workerProvider.getAllWorkers()) { + assignment.put(computeNode.getId(), scanNode.getId().asInt(), end); + } + } + } + + private List computeForceScheduleLocalAssignment(long avgNodeScanRangeBytes) throws UserException { + // be host -> bes + Multimap hostToBackends = HashMultimap.create(); + for (ComputeNode computeNode : workerProvider.getAllWorkers()) { + hostToBackends.put(computeNode.getHost(), computeNode); + } + + List unassigned = Lists.newArrayList(); + for (int i = 0; i < locations.size(); ++i) { + TScanRangeLocations scanRangeLocations = locations.get(i); + List backends = new ArrayList<>(); + // select all backends that are co-located with this scan range. + for (final TScanRangeLocation location : scanRangeLocations.getLocations()) { + Collection servers = hostToBackends.get(location.getServer().getHostname()); + if (servers.isEmpty()) { + continue; + } + backends.addAll(servers); + } + ComputeNode node = + reBalanceScanRangeForComputeNode(backends, avgNodeScanRangeBytes, scanRangeLocations); + if (node == null) { + unassigned.add(scanRangeLocations); + } else { + recordScanRangeAssignment(node, backends, scanRangeLocations); + } + } + return unassigned; + } + + private void computeGeneralAssignment() throws UserException { if (locations.size() == 0) { return; } long totalSize = computeTotalSize(); long avgNodeScanRangeBytes = totalSize / Math.max(workerProvider.getAllWorkers().size(), 1) + 1; - for (ComputeNode computeNode : workerProvider.getAllWorkers()) { assignedScansPerComputeNode.put(computeNode, 0L); reBalanceBytesPerComputeNode.put(computeNode, 0L); - hostToBackends.put(computeNode.getHost(), computeNode); } // schedule scan ranges to co-located backends. // and put rest scan ranges into remote scan ranges. - List remoteScanRangeLocations = Lists.newArrayList(); + List remoteScanRangeLocations = locations; if (forceScheduleLocal) { - for (int i = 0; i < locations.size(); ++i) { - TScanRangeLocations scanRangeLocations = locations.get(i); - List backends = new ArrayList<>(); - // select all backends that are co-located with this scan range. - for (final TScanRangeLocation location : scanRangeLocations.getLocations()) { - Collection servers = hostToBackends.get(location.getServer().getHostname()); - if (servers == null || servers.isEmpty()) { - continue; - } - backends.addAll(servers); - } - ComputeNode node = - reBalanceScanRangeForComputeNode(backends, avgNodeScanRangeBytes, scanRangeLocations); - if (node == null) { - remoteScanRangeLocations.add(scanRangeLocations); - } else { - recordScanRangeAssignment(node, backends, scanRangeLocations); - } - } - } else { - remoteScanRangeLocations = locations; + remoteScanRangeLocations = computeForceScheduleLocalAssignment(avgNodeScanRangeBytes); } if (remoteScanRangeLocations.isEmpty()) { return; @@ -281,7 +305,7 @@ public void computeScanRangeAssignment() throws UserException { List backends = hashRing.get(scanRangeLocations, kCandidateNumber); ComputeNode node = reBalanceScanRangeForComputeNode(backends, avgNodeScanRangeBytes, scanRangeLocations); if (node == null) { - throw new RuntimeException("Failed to find backend to execute"); + throw new UserException("Failed to find backend to execute"); } recordScanRangeAssignment(node, backends, scanRangeLocations); } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java index 684aec3fa23f9..71fdd09d73d82 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java @@ -784,6 +784,8 @@ public static MaterializedViewRewriteMode parse(String str) { public static final String CONNECTOR_REMOTE_FILE_ASYNC_QUEUE_SIZE = "connector_remote_file_async_queue_size"; public static final String CONNECTOR_REMOTE_FILE_ASYNC_TASK_SIZE = "connector_remote_file_async_task_size"; + public static final String ENABLE_CONNECTOR_INCREMENTAL_SCAN_RANGES = "enable_connector_incremental_scan_ranges"; + public static final String CONNECTOR_INCREMENTAL_SCAN_RANGE_SIZE = "connector_incremental_scan_ranges_size"; public static final List DEPRECATED_VARIABLES = ImmutableList.builder() .add(CODEGEN_LEVEL) @@ -2135,6 +2137,12 @@ public SessionVariable setHiveTempStagingDir(String hiveTempStagingDir) { @VarAttr(name = CONNECTOR_REMOTE_FILE_ASYNC_TASK_SIZE, flag = VariableMgr.INVISIBLE) private int connectorRemoteFileAsyncTaskSize = 4; + @VarAttr(name = ENABLE_CONNECTOR_INCREMENTAL_SCAN_RANGES) + private boolean enableConnectorIncrementalScanRanges = false; + + @VarAttr(name = CONNECTOR_INCREMENTAL_SCAN_RANGE_SIZE) + private int connectorIncrementalScanRangeSize = 1000; + public SessionVariableConstants.ChooseInstancesMode getChooseExecuteInstancesMode() { return Enums.getIfPresent(SessionVariableConstants.ChooseInstancesMode.class, StringUtils.upperCase(chooseExecuteInstancesMode)) @@ -4134,6 +4142,22 @@ public int getConnectorRemoteFileAsyncTaskSize() { return connectorRemoteFileAsyncTaskSize; } + public int getConnectorIncrementalScanRangeNumber() { + return connectorIncrementalScanRangeSize; + } + + public void setConnectorIncrementalScanRangeNumber(int v) { + connectorIncrementalScanRangeSize = v; + } + + public boolean isEnableConnectorIncrementalScanRanges() { + return enableConnectorIncrementalScanRanges; + } + + public void setEnableConnectorIncrementalScanRanges(boolean v) { + enableConnectorIncrementalScanRanges = v; + } + // Serialize to thrift object // used for rest api public TQueryOptions toThrift() { diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java index 0e9763ce517a3..8eb82accb1d27 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Coordinator.java @@ -16,6 +16,7 @@ import com.starrocks.analysis.DescriptorTable; import com.starrocks.common.Status; +import com.starrocks.common.UserException; import com.starrocks.common.util.RuntimeProfile; import com.starrocks.datacache.DataCacheSelectMetrics; import com.starrocks.planner.PlanFragment; @@ -26,6 +27,7 @@ import com.starrocks.qe.ConnectContext; import com.starrocks.qe.QueryStatisticsItem; import com.starrocks.qe.RowBatch; +import com.starrocks.qe.scheduler.slot.DeployState; import com.starrocks.qe.scheduler.slot.LogicalSlot; import com.starrocks.sql.LoadPlanner; import com.starrocks.sql.plan.ExecPlan; @@ -80,8 +82,8 @@ Coordinator createBrokerExportScheduler(Long jobId, TUniqueId queryId, Descripto long warehouseId); Coordinator createRefreshDictionaryCacheScheduler(ConnectContext context, TUniqueId queryId, - DescriptorTable descTable, List fragments, - List scanNodes); + DescriptorTable descTable, List fragments, + List scanNodes); } // ------------------------------------------------------------------------------------ @@ -129,6 +131,11 @@ public void cancel(String cancelledMessage) { public abstract void cancel(PPlanFragmentCancelReason reason, String message); + public List assignIncrementalScanRangesToDeployStates(Deployer deployer, List deployStates) + throws UserException { + return List.of(); + } + public abstract void onFinished(); public abstract LogicalSlot getSlot(); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Deployer.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Deployer.java index e32af9c11ca3f..8652eb6c910f4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Deployer.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/Deployer.java @@ -204,6 +204,7 @@ private void createFragmentInstanceExecStates(ExecutionFragment fragment, fragment.getFragmentIndex(), request, instance.getWorker()); + execution.setFragmentInstance(instance); threeStageExecutionsToDeploy.get(stageIndex).add(execution); @@ -250,5 +251,9 @@ private void waitForDeploymentCompletion(List executi failureHandler.apply(firstErrResult.getStatus(), firstErrExecution, firstErrResult.getFailure()); } } + + public TExecPlanFragmentParams createIncrementalScanRangesRequest(FragmentInstance instance) { + return tFragmentInstanceFactory.createIncrementalScanRanges(instance); + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/TFragmentInstanceFactory.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/TFragmentInstanceFactory.java index 0ab62557d79f5..7862eb27ac735 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/TFragmentInstanceFactory.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/TFragmentInstanceFactory.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; public class TFragmentInstanceFactory { @@ -90,6 +91,17 @@ public TExecPlanFragmentParams create(FragmentInstance instance, return result; } + public TExecPlanFragmentParams createIncrementalScanRanges(FragmentInstance instance) { + TExecPlanFragmentParams result = new TExecPlanFragmentParams(); + result.setProtocol_version(InternalServiceVersion.V1); + result.setParams(new TPlanFragmentExecParams()); + result.params.setQuery_id(jobSpec.getQueryId()); + result.params.setFragment_instance_id(instance.getInstanceId()); + result.params.setPer_node_scan_ranges(instance.getNode2ScanRanges()); + result.params.setPer_exch_num_senders(new HashMap<>()); + return result; + } + public void toThriftFromCommonParams(TExecPlanFragmentParams result, ExecutionFragment execFragment, TDescriptorTable descTable, diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/BackendSelectorFactory.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/BackendSelectorFactory.java index dcb2ad39ad264..c4e6054f07a26 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/BackendSelectorFactory.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/BackendSelectorFactory.java @@ -50,16 +50,22 @@ public static BackendSelector create(ScanNode scanNode, ExecutionFragment execFragment, WorkerProvider workerProvider, ConnectContext connectContext, - Set destReplicatedScanIds) { + Set destReplicatedScanIds, + boolean useIncrementalScanRanges) { + SessionVariable sessionVariable = connectContext.getSessionVariable(); + FragmentScanRangeAssignment assignment = execFragment.getScanRangeAssignment(); + // The parameters of getScanRangeLocations may ignore, It doesn't take effect. - List locations = scanNode.getScanRangeLocations(0); + int maxScanRangeLength = 0; + if (useIncrementalScanRanges) { + maxScanRangeLength = sessionVariable.getConnectorIncrementalScanRangeNumber(); + } + + List locations = scanNode.getScanRangeLocations(maxScanRangeLength); if (locations == null) { return new NoopBackendSelector(); } - SessionVariable sessionVariable = connectContext.getSessionVariable(); - FragmentScanRangeAssignment assignment = execFragment.getScanRangeAssignment(); - if (scanNode instanceof SchemaScanNode) { return new NormalBackendSelector(scanNode, locations, assignment, workerProvider, false); } else if (scanNode instanceof HdfsScanNode || scanNode instanceof IcebergScanNode || @@ -68,7 +74,7 @@ public static BackendSelector create(ScanNode scanNode, || scanNode instanceof OdpsScanNode || scanNode instanceof IcebergMetadataScanNode) { return new HDFSBackendSelector(scanNode, locations, assignment, workerProvider, sessionVariable.getForceScheduleLocal(), - sessionVariable.getHDFSBackendSelectorScanRangeShuffle()); + sessionVariable.getHDFSBackendSelectorScanRangeShuffle(), useIncrementalScanRanges); } else { boolean hasColocate = execFragment.isColocated(); boolean hasBucket = execFragment.isLocalBucketShuffleJoin(); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/FragmentAssignmentStrategyFactory.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/FragmentAssignmentStrategyFactory.java index 2cfec4a800529..e9f84e8216585 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/FragmentAssignmentStrategyFactory.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/FragmentAssignmentStrategyFactory.java @@ -48,7 +48,7 @@ public FragmentAssignmentStrategy create(ExecutionFragment execFragment, WorkerP executionDAG.isGatherOutput(), random); } else { return new LocalFragmentAssignmentStrategy(connectContext, workerProvider, jobSpec.isEnablePipeline(), - jobSpec.isLoadType()); + jobSpec.isLoadType(), jobSpec.isIncrementalScanRanges()); } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/LocalFragmentAssignmentStrategy.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/LocalFragmentAssignmentStrategy.java index f0fc3a9b96171..c451b00ddb142 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/LocalFragmentAssignmentStrategy.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/assignment/LocalFragmentAssignmentStrategy.java @@ -23,6 +23,7 @@ import com.starrocks.qe.BackendSelector; import com.starrocks.qe.ColocatedBackendSelector; import com.starrocks.qe.ConnectContext; +import com.starrocks.qe.FragmentScanRangeAssignment; import com.starrocks.qe.SessionVariable; import com.starrocks.qe.scheduler.WorkerProvider; import com.starrocks.qe.scheduler.dag.ExecutionFragment; @@ -33,6 +34,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -53,13 +55,17 @@ public class LocalFragmentAssignmentStrategy implements FragmentAssignmentStrate private final Set replicatedScanIds = Sets.newHashSet(); + private final boolean useIncrementalScanRanges; + public LocalFragmentAssignmentStrategy(ConnectContext connectContext, WorkerProvider workerProvider, boolean usePipeline, - boolean isLoadType) { + boolean isLoadType, + boolean useIncrementalScanRanges) { this.connectContext = connectContext; this.workerProvider = workerProvider; this.usePipeline = usePipeline; this.isLoadType = isLoadType; + this.useIncrementalScanRanges = useIncrementalScanRanges; } @Override @@ -82,7 +88,7 @@ public void assignFragmentToWorker(ExecutionFragment execFragment) throws UserEx private void assignScanRangesToWorker(ExecutionFragment execFragment, ScanNode scanNode) throws UserException { BackendSelector backendSelector = BackendSelectorFactory.create( - scanNode, isLoadType, execFragment, workerProvider, connectContext, replicatedScanIds); + scanNode, isLoadType, execFragment, workerProvider, connectContext, replicatedScanIds, useIncrementalScanRanges); backendSelector.computeScanRangeAssignment(); @@ -181,7 +187,8 @@ private void assignScanRangesToColocateFragmentInstancePerWorker( FragmentInstance instance = new FragmentInstance(worker, execFragment); execFragment.addInstance(instance); - // record each instance replicate scan id in set, to avoid add replicate scan range repeatedly when they are in different buckets + // record each instance replicate scan id in set, to avoid add replicate scan range repeatedly + // when they are in different buckets Set instanceReplicatedScanIds = new HashSet<>(); if (!assignPerDriverSeq) { @@ -239,7 +246,14 @@ private void assignScanRangesToNormalFragmentInstancePerWorker(ExecutionFragment final int parallelExecInstanceNum = fragment.getParallelExecNum(); final int pipelineDop = fragment.getPipelineDop(); - execFragment.getScanRangeAssignment().forEach((workerId, scanRangesPerWorker) -> { + FragmentScanRangeAssignment assignment = execFragment.getScanRangeAssignment(); + final Map fragmentInstanceMap = new HashMap<>(); + if (!execFragment.getInstances().isEmpty()) { + for (FragmentInstance fragmentInstance : execFragment.getInstances()) { + fragmentInstanceMap.put(fragmentInstance.getWorkerId(), fragmentInstance); + } + } + assignment.forEach((workerId, scanRangesPerWorker) -> { // 1. Handle normal scan node firstly scanRangesPerWorker.forEach((scanId, scanRangesOfNode) -> { if (replicatedScanIds.contains(scanId)) { @@ -249,12 +263,15 @@ private void assignScanRangesToNormalFragmentInstancePerWorker(ExecutionFragment int expectedInstanceNum = Math.max(1, parallelExecInstanceNum); List> scanRangesPerInstance = ListUtil.splitBySize(scanRangesOfNode, expectedInstanceNum); - for (List scanRanges : scanRangesPerInstance) { - FragmentInstance instance = - new FragmentInstance(workerProvider.getWorkerById(workerId), execFragment); - execFragment.addInstance(instance); - + FragmentInstance instance = null; + if (useIncrementalScanRanges && !fragmentInstanceMap.isEmpty()) { + instance = fragmentInstanceMap.get(workerId); + } else { + instance = + new FragmentInstance(workerProvider.getWorkerById(workerId), execFragment); + execFragment.addInstance(instance); + } if (!enableAssignScanRangesPerDriverSeq(fragment, scanRanges)) { instance.addScanRanges(scanId, scanRanges); fragment.disablePhysicalPropertyOptimize(); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/AllAtOnceExecutionSchedule.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/AllAtOnceExecutionSchedule.java index 88f70a01175c5..e5cd86032df0b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/AllAtOnceExecutionSchedule.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/AllAtOnceExecutionSchedule.java @@ -15,29 +15,45 @@ package com.starrocks.qe.scheduler.dag; import com.starrocks.common.UserException; +import com.starrocks.qe.scheduler.Coordinator; import com.starrocks.qe.scheduler.Deployer; import com.starrocks.qe.scheduler.slot.DeployState; import com.starrocks.rpc.RpcException; import com.starrocks.thrift.TUniqueId; +import java.util.ArrayList; import java.util.List; // all at once execution schedule only schedule once. public class AllAtOnceExecutionSchedule implements ExecutionSchedule { + private Coordinator coordinator; private Deployer deployer; private ExecutionDAG dag; @Override - public void prepareSchedule(Deployer deployer, ExecutionDAG dag) { + public void prepareSchedule(Coordinator coordinator, Deployer deployer, ExecutionDAG dag) { + this.coordinator = coordinator; this.deployer = deployer; this.dag = dag; } @Override public void schedule() throws RpcException, UserException { + List states = new ArrayList<>(); for (List executionFragments : dag.getFragmentsInTopologicalOrderFromRoot()) { final DeployState deployState = deployer.createFragmentExecStates(executionFragments); deployer.deployFragments(deployState); + states.add(deployState); + } + + while (true) { + states = coordinator.assignIncrementalScanRangesToDeployStates(deployer, states); + if (states.isEmpty()) { + break; + } + for (DeployState state : states) { + deployer.deployFragments(state); + } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ExecutionSchedule.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ExecutionSchedule.java index b4c70f6cfddf2..85565a383ccb9 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ExecutionSchedule.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/ExecutionSchedule.java @@ -15,12 +15,13 @@ package com.starrocks.qe.scheduler.dag; import com.starrocks.common.UserException; +import com.starrocks.qe.scheduler.Coordinator; import com.starrocks.qe.scheduler.Deployer; import com.starrocks.rpc.RpcException; import com.starrocks.thrift.TUniqueId; public interface ExecutionSchedule { - void prepareSchedule(Deployer deployer, ExecutionDAG dag); + void prepareSchedule(Coordinator coordinator, Deployer deployer, ExecutionDAG dag); void schedule() throws RpcException, UserException; diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/FragmentInstanceExecState.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/FragmentInstanceExecState.java index 05b9fe7cd16e6..a2db0077f1a61 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/FragmentInstanceExecState.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/FragmentInstanceExecState.java @@ -95,6 +95,8 @@ public class FragmentInstanceExecState { private final TNetworkAddress address; private final long lastMissingHeartbeatTime; + private FragmentInstance fragmentInstance; + /** * Create a fake backendExecState, only user for stream load profile. */ @@ -127,7 +129,6 @@ public static FragmentInstanceExecState createExecution(JobSpec jobSpec, request, profile, worker, address, worker.getLastMissingHeartbeatTime()); - } private FragmentInstanceExecState(JobSpec jobSpec, @@ -483,4 +484,20 @@ public boolean isTerminal() { return this == FINISHED || this == FAILED; } } + + public FragmentInstance getFragmentInstance() { + return fragmentInstance; + } + + public void setFragmentInstance(FragmentInstance fragmentInstance) { + this.fragmentInstance = fragmentInstance; + } + + public TExecPlanFragmentParams getRequestToDeploy() { + return requestToDeploy; + } + + public void setRequestToDeploy(TExecPlanFragmentParams requestToDeploy) { + this.requestToDeploy = requestToDeploy; + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java index 43cdd65b64c22..68fd6a3f2026b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/JobSpec.java @@ -93,6 +93,8 @@ public long getWarehouseId() { private boolean needQueued = false; private boolean enableGroupLevelQueue = false; + private boolean incrementalScanRanges = false; + public static class Factory { private Factory() { } @@ -488,6 +490,14 @@ public String getPlanProtocol() { return planProtocol; } + public boolean isIncrementalScanRanges() { + return incrementalScanRanges; + } + + public void setIncrementalScanRanges(boolean v) { + incrementalScanRanges = v; + } + public void reset() { fragments.forEach(PlanFragment::reset); } @@ -499,6 +509,7 @@ public SlotProvider getSlotProvider() { return GlobalStateMgr.getCurrentState().getGlobalSlotProvider(); } } + public boolean hasOlapTableSink() { for (PlanFragment fragment : fragments) { if (fragment.hasOlapTableSink()) { @@ -507,6 +518,7 @@ public boolean hasOlapTableSink() { } return false; } + public static class Builder { private final JobSpec instance = new JobSpec(); diff --git a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/PhasedExecutionSchedule.java b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/PhasedExecutionSchedule.java index 3f489d10357f8..a6e2714d8f2a4 100644 --- a/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/PhasedExecutionSchedule.java +++ b/fe/fe-core/src/main/java/com/starrocks/qe/scheduler/dag/PhasedExecutionSchedule.java @@ -22,6 +22,7 @@ import com.starrocks.common.util.UnionFind; import com.starrocks.planner.PlanFragmentId; import com.starrocks.qe.ConnectContext; +import com.starrocks.qe.scheduler.Coordinator; import com.starrocks.qe.scheduler.Deployer; import com.starrocks.qe.scheduler.slot.DeployState; import com.starrocks.rpc.RpcException; @@ -59,6 +60,7 @@ List getFragments() { } } + private Coordinator coordinator; private Deployer deployer; private ExecutionDAG dag; @@ -106,7 +108,8 @@ public PhasedExecutionSchedule(ConnectContext context) { // fragment id -> fragment child schedule order private Map sequenceMap = Maps.newHashMap(); - public void prepareSchedule(Deployer deployer, ExecutionDAG dag) { + public void prepareSchedule(Coordinator coordinator, Deployer deployer, ExecutionDAG dag) { + this.coordinator = coordinator; this.deployer = deployer; this.dag = dag; ExecutionFragment rootFragment = dag.getRootFragment(); @@ -213,11 +216,21 @@ private void doDeploy() throws RpcException, UserException { if (deployStates.isEmpty()) { return; } - final List deployState = deployStates.poll(); + List deployState = deployStates.poll(); Preconditions.checkState(deployState != null); for (DeployState state : deployState) { deployer.deployFragments(state); } + + while (true) { + deployState = coordinator.assignIncrementalScanRangesToDeployStates(deployer, deployState); + if (deployState.isEmpty()) { + break; + } + for (DeployState state : deployState) { + deployer.deployFragments(state); + } + } } public void tryScheduleNextTurn(TUniqueId fragmentInstanceId) throws RpcException, UserException { @@ -315,7 +328,7 @@ private boolean scheduleNext(List> scheduleFragments) { } if (groups.size() != fragment.childrenSize()) { - List fragments = Lists.newArrayList(); + List fragments = Lists.newArrayList(); for (int i = fragment.childrenSize() - 1; i >= 0; i--) { final ExecutionFragment child = sequenceMap.get(fragment.getFragmentId()).getAt(i); final PlanFragmentId childFragmentId = child.getFragmentId(); diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/HDFSBackendSelectorTest.java b/fe/fe-core/src/test/java/com/starrocks/qe/HDFSBackendSelectorTest.java index 560bc4e1381df..d0d90e58dbb94 100644 --- a/fe/fe-core/src/test/java/com/starrocks/qe/HDFSBackendSelectorTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/qe/HDFSBackendSelectorTest.java @@ -132,7 +132,8 @@ public void testHdfsScanNodeHashRing() throws Exception { ); HDFSBackendSelector selector = - new HDFSBackendSelector(hdfsScanNode, locations, assignment, workerProvider, false, false); + new HDFSBackendSelector(hdfsScanNode, locations, assignment, workerProvider, + false, false, false); selector.computeScanRangeAssignment(); int avg = (scanRangeNumber * scanRangeSize) / hostNumber; @@ -152,7 +153,8 @@ public void testHdfsScanNodeHashRing() throws Exception { true ); selector = - new HDFSBackendSelector(hdfsScanNode, locations, assignment, workerProvider, false, false); + new HDFSBackendSelector(hdfsScanNode, locations, assignment, workerProvider, + false, false, false); try { selector.computeScanRangeAssignment(); Assert.fail(); @@ -198,7 +200,8 @@ public void testHdfsScanNodeScanRangeReBalance() throws Exception { ); HDFSBackendSelector selector = - new HDFSBackendSelector(hdfsScanNode, locations, assignment, workerProvider, false, false); + new HDFSBackendSelector(hdfsScanNode, locations, assignment, workerProvider, + false, false, false); selector.computeScanRangeAssignment(); long avg = (scanRangeNumber * scanRangeSize) / hostNumber + 1; @@ -245,7 +248,8 @@ public void testHashRingAlgorithm() { true ); HDFSBackendSelector selector = - new HDFSBackendSelector(hdfsScanNode, locations, assignment, workerProvider, false, false); + new HDFSBackendSelector(hdfsScanNode, locations, assignment, workerProvider, + false, false, false); HashRing hashRing = selector.makeHashRing(); Assert.assertTrue(hashRing.policy().equals("ConsistentHash")); ConsistentHashRing consistentHashRing = (ConsistentHashRing) hashRing; @@ -304,7 +308,8 @@ public void testHdfsScanNodeForceScheduleLocal() throws Exception { ); HDFSBackendSelector selector = - new HDFSBackendSelector(hdfsScanNode, locations, assignment, workerProvider, true, false); + new HDFSBackendSelector(hdfsScanNode, locations, assignment, workerProvider, + true, false, false); selector.computeScanRangeAssignment(); Map stats = computeWorkerIdToReadBytes(assignment, scanNodeId); @@ -313,4 +318,63 @@ public void testHdfsScanNodeForceScheduleLocal() throws Exception { System.out.printf("%s -> %d bytes\n", entry.getKey(), entry.getValue()); } } + + @Test + public void testHdfsScanNodeIncrementalScanRanges() throws Exception { + SessionVariable sessionVariable = new SessionVariable(); + new Expectations() { + { + hdfsScanNode.getId(); + result = scanNodeId; + + hdfsScanNode.getTableName(); + result = "hive_tbl"; + + hiveTable.getTableLocation(); + result = "hdfs://dfs00/dataset/"; + + ConnectContext.get(); + result = context; + + context.getSessionVariable(); + result = sessionVariable; + } + }; + + int scanRangeNumber = 1; + int hostNumber = 3; + List locations = createScanRanges(scanRangeNumber, scanRangeNumber); + FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment(); + ImmutableMap computeNodes = createComputeNodes(hostNumber); + DefaultWorkerProvider workerProvider = new DefaultWorkerProvider( + ImmutableMap.of(), + computeNodes, + ImmutableMap.of(), + computeNodes, + true + ); + + HDFSBackendSelector selector = + new HDFSBackendSelector(hdfsScanNode, locations, assignment, workerProvider, + false, false, true); + selector.computeScanRangeAssignment(); + Assert.assertEquals(assignment.size(), 3); + int scanRanges = 0; + for (Map> scanNodes : assignment.values()) { + Assert.assertEquals(scanNodes.size(), 1); + List scanRangeParams = scanNodes.get(scanNodeId); + Assert.assertTrue(scanRangeParams.size() >= 1); + TScanRangeParams last = scanRangeParams.get(scanRangeParams.size() - 1); + Assert.assertTrue(last.isSetPlaceholder()); + Assert.assertTrue(last.isSetHas_more()); + Assert.assertTrue(last.isPlaceholder()); + Assert.assertTrue(last.has_more == false); + for (TScanRangeParams p : scanRangeParams) { + if (!p.isPlaceholder()) { + scanRanges += 1; + } + } + } + Assert.assertEquals(scanRanges, scanRangeNumber); + } } diff --git a/fe/fe-core/src/test/java/com/starrocks/qe/scheduler/IncrementalDeployHiveTest.java b/fe/fe-core/src/test/java/com/starrocks/qe/scheduler/IncrementalDeployHiveTest.java new file mode 100644 index 0000000000000..d350853017923 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/qe/scheduler/IncrementalDeployHiveTest.java @@ -0,0 +1,149 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.qe.scheduler; + +import com.starrocks.qe.DefaultCoordinator; +import com.starrocks.qe.scheduler.dag.FragmentInstanceExecState; +import com.starrocks.qe.scheduler.slot.DeployState; +import com.starrocks.thrift.TExecPlanFragmentParams; +import com.starrocks.thrift.THdfsScanRange; +import com.starrocks.thrift.TPlanFragmentExecParams; +import com.starrocks.thrift.TScanRangeParams; +import com.starrocks.thrift.TUniqueId; +import mockit.Mock; +import mockit.MockUp; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class IncrementalDeployHiveTest extends SchedulerConnectorTestBase { + + @Test + public void testSchedule() throws Exception { + // test different settings. + connectContext.getSessionVariable().setEnableConnectorIncrementalScanRanges(true); + { + connectContext.getSessionVariable().setConnectorIncrementalScanRangeNumber(20); + runSchedule(); + } + { + connectContext.getSessionVariable().setConnectorIncrementalScanRangeNumber(1000); + runSchedule(); + } + { + connectContext.getSessionVariable().setConnectorIncrementalScanRangeNumber(20); + connectContext.getSessionVariable().setEnablePhasedScheduler(true); + runSchedule(); + } + } + + public void runSchedule() throws Exception { + String sql = "select * from hive0.file_split_db.file_split_tbl"; + List requests = new ArrayList<>(); + int maxScanRangeNumber = connectContext.getSessionVariable().getConnectorIncrementalScanRangeNumber(); + // deploy + new MockUp() { + @Mock + public void deployFragments(DeployState deployState) { + System.out.println("----- deploy fragments ------"); + final List> state = + deployState.getThreeStageExecutionsToDeploy(); + int scanRangeNumber = 0; + for (List execStates : state) { + for (FragmentInstanceExecState execState : execStates) { + { + TPlanFragmentExecParams params = execState.getRequestToDeploy().params; + // there is a placeholder + if (!params.getPer_node_scan_ranges().isEmpty()) { + scanRangeNumber += params.getPer_node_scan_rangesSize() - 1; + } + for (List v : params.getPer_node_scan_ranges().values()) { + for (TScanRangeParams p : v) { + System.out.println(p + ", " + System.identityHashCode(p)); + } + } + } + // instance.node2ScanRanges is shared during incremental deployment. + requests.add(execState.getRequestToDeploy().deepCopy()); + } + } + Assert.assertTrue(scanRangeNumber <= maxScanRangeNumber); + } + }; + final DefaultCoordinator coordinator = startScheduling(sql); + Assert.assertTrue(coordinator.getJobSpec().isIncrementalScanRanges()); + Map> workload = new HashMap<>(); + for (TExecPlanFragmentParams r : requests) { + TPlanFragmentExecParams params = r.params; + TUniqueId fragmentInstanceId = params.getFragment_instance_id(); + if (params.getPer_node_scan_ranges().isEmpty()) { + continue; + } + List scanRanges = workload.computeIfAbsent(fragmentInstanceId, key -> new + ArrayList<>()); + for (List v : params.getPer_node_scan_ranges().values()) { + scanRanges.addAll(v); + } + } + // 3 nodes, each node has 1 instance. + Assert.assertEquals(workload.size(), 3); + Map> fileRangesMap = new HashMap<>(); + for (Map.Entry> kv : workload.entrySet()) { + System.out.println("----- checking fragment: " + kv.getKey() + "-----"); + List v = kv.getValue(); + for (int index = 0; index < v.size(); index++) { + TScanRangeParams p = v.get(index); + System.out.println(p + ", " + System.identityHashCode(p)); + if (p.isPlaceholder()) { + if (!p.has_more) { + Assert.assertTrue((index + 1) == v.size()); + } + } else { + THdfsScanRange sc = p.scan_range.hdfs_scan_range; + String file = sc.relative_path; + List ranges = fileRangesMap.computeIfAbsent(file, x -> new ArrayList<>()); + ranges.add(sc); + } + } + } + + for (Map.Entry> kv : fileRangesMap.entrySet()) { + System.out.println("----- checking file: " + kv.getKey() + "-----"); + List fileRangess = kv.getValue(); + fileRangess.sort(new Comparator() { + @Override + public int compare(THdfsScanRange o1, THdfsScanRange o2) { + return (int) (o1.offset - o2.offset); + } + }); + for (int i = 0; i < fileRangess.size(); i++) { + THdfsScanRange f = fileRangess.get(i); + if (i == 0) { + Assert.assertEquals(f.offset, 0); + } else if ((i + 1) == fileRangess.size()) { + Assert.assertEquals(f.offset + f.length, f.file_length); + } else { + THdfsScanRange nf = fileRangess.get(i + 1); + Assert.assertEquals((f.offset + f.length), nf.offset); + } + } + } + } +} diff --git a/gensrc/thrift/InternalService.thrift b/gensrc/thrift/InternalService.thrift index e5df65904585f..bdafa63d08ae4 100644 --- a/gensrc/thrift/InternalService.thrift +++ b/gensrc/thrift/InternalService.thrift @@ -327,6 +327,8 @@ struct TQueryOptions { struct TScanRangeParams { 1: required PlanNodes.TScanRange scan_range 2: optional i32 volume_id = -1 + 3: optional bool placeholder = false + 4: optional bool has_more = false; } // Parameters for a single execution instance of a particular TPlanFragment