-
Notifications
You must be signed in to change notification settings - Fork 2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Resource Isolation Feature Checkpoint 4: Extend cache warmup to allow multiple resource isolation groups and multiple replicas #12
Changes from all commits
42a2b62
5a9b98c
7f87cc8
55795e4
6442fac
ec2132d
e3aa09d
6c8fd83
5bcf04f
8376c2c
d682c96
e30a6ef
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,156 @@ | ||
// Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package com.starrocks.qe; | ||
|
||
import com.starrocks.common.DdlException; | ||
import com.starrocks.common.UserException; | ||
import com.starrocks.lake.qe.scheduler.DefaultSharedDataWorkerProvider; | ||
import com.starrocks.planner.ScanNode; | ||
import com.starrocks.qe.scheduler.WorkerProvider; | ||
import com.starrocks.server.GlobalStateMgr; | ||
import com.starrocks.system.SystemInfoService; | ||
import com.starrocks.system.TabletComputeNodeMapper; | ||
import com.starrocks.thrift.TScanRangeLocations; | ||
import com.starrocks.thrift.TScanRangeParams; | ||
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
|
||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.stream.Collectors; | ||
|
||
import static com.starrocks.qe.scheduler.Utils.getOptionalTabletId; | ||
|
||
// This class should only be used in shared data mode. | ||
public class CacheSelectBackendSelector implements BackendSelector { | ||
private static final Logger LOG = LogManager.getLogger(CacheSelectBackendSelector.class); | ||
|
||
// Inputs | ||
private final ScanNode scanNode; | ||
private final List<TScanRangeLocations> locations; | ||
private final CacheSelectComputeNodeSelectionProperties props; | ||
private final long warehouseId; | ||
|
||
// Outputs | ||
private final FragmentScanRangeAssignment assignment; | ||
// This WorkerProvider is used to provide signal to the caller, but not used to select the compute nodes to use. | ||
private final WorkerProvider callerWorkerProvider; | ||
|
||
public CacheSelectBackendSelector(ScanNode scanNode, List<TScanRangeLocations> locations, | ||
FragmentScanRangeAssignment assignment, WorkerProvider callerWorkerProvider, | ||
CacheSelectComputeNodeSelectionProperties props, long warehouseId) { | ||
this.scanNode = scanNode; | ||
this.locations = locations; | ||
this.assignment = assignment; | ||
this.callerWorkerProvider = callerWorkerProvider; | ||
this.props = props; | ||
this.warehouseId = warehouseId; | ||
} | ||
|
||
private Set<Long> assignedCnByTabletId(SystemInfoService systemInfoService, Long tabletId, | ||
String resourceIsolationGroupId) throws UserException { | ||
TabletComputeNodeMapper mapper = systemInfoService.internalTabletMapper(); | ||
List<Long> cnIdsOrderedByPreference = | ||
mapper.computeNodesForTablet(tabletId, props.numReplicasDesired, resourceIsolationGroupId); | ||
if (cnIdsOrderedByPreference.size() < props.numReplicasDesired) { | ||
throw new DdlException(String.format("Requesting more replicas than we have available CN" + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we add a TODO: if requesting more replicas than available, shall we trigger a replica load task, from S3 to compute node There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not totally sure what you mean by "replica load task", that basically describes what is already being executed here. Are you saying we should schedule it for later, if/when we have more compute nodes available? I don't think that's a good approach necessarily. My thinking is this: if we're requesting more replicas than we have compute nodes, we can't fulfill the intention of the statement, so I'm throwing an exception. The other option is to make a best-effort attempt and select all of the available compute nodes. |
||
" for the specified resource group. desiredReplicas: %d, resourceGroup: %s, tabletId: %d", | ||
props.numReplicasDesired, resourceIsolationGroupId, tabletId)); | ||
} | ||
return new HashSet<>(cnIdsOrderedByPreference); | ||
} | ||
|
||
private Set<Long> assignedCnByBackupWorker(Long mainTargetCnId, String resourceIsolationGroupId) | ||
throws UserException { | ||
Set<Long> selectedCn = new HashSet<>(); | ||
DefaultSharedDataWorkerProvider workerProvider = | ||
new DefaultSharedDataWorkerProvider.Factory().captureAvailableWorkers(warehouseId, | ||
resourceIsolationGroupId); | ||
long targetBackendId = mainTargetCnId; | ||
while (selectedCn.size() < props.numReplicasDesired) { | ||
if (selectedCn.contains(targetBackendId) || !workerProvider.isDataNodeAvailable(targetBackendId)) { | ||
targetBackendId = workerProvider.selectBackupWorker(targetBackendId, Optional.empty()); | ||
if (targetBackendId < 0 || selectedCn.contains(targetBackendId)) { | ||
workerProvider.reportDataNodeNotFoundException(); | ||
throw new DdlException(String.format("Requesting more replicas than we have available CN" + | ||
" for the specified resource group. desiredReplicas: %d, resourceGroup: %s", | ||
props.numReplicasDesired, resourceIsolationGroupId)); | ||
} | ||
} | ||
selectedCn.add(targetBackendId); | ||
} | ||
return selectedCn; | ||
} | ||
|
||
@Override | ||
public void computeScanRangeAssignment() throws UserException { | ||
if (props.resourceIsolationGroups == null || props.resourceIsolationGroups.isEmpty()) { | ||
throw new UserException("Should not have constructed CacheSelectBackendSelector with no" + | ||
" resourceIsolationGroups specified."); | ||
} | ||
if (props.numReplicasDesired < 1) { | ||
throw new UserException("Num replicas desired in cache must be at least 1: " + props.numReplicasDesired); | ||
} | ||
|
||
SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo(); | ||
Set<Long> allSelectedWorkerIds = new HashSet<>(); | ||
for (TScanRangeLocations scanRangeLocations : locations) { | ||
TScanRangeParams scanRangeParams = new TScanRangeParams(scanRangeLocations.scan_range); | ||
Optional<Long> tabletId = getOptionalTabletId(scanRangeLocations.scan_range); | ||
// Try to create assignments for each of the resourceIsolationGroups specified. | ||
for (String resourceIsolationGroupId : props.resourceIsolationGroups) { | ||
Set<Long> selectedCn; | ||
// If we've been provided the relevant tablet id, and we're using resource isolation groups, which | ||
// is when we prefer to use the internal mapping, then we populate the datacaches of the CN which | ||
// are most preferred for the tablet. | ||
if (tabletId.isPresent() && systemInfoService.shouldUseInternalTabletToCnMapper()) { | ||
selectedCn = assignedCnByTabletId(systemInfoService, tabletId.get(), resourceIsolationGroupId); | ||
} else { | ||
if (scanRangeLocations.getLocationsSize() != 1) { | ||
throw new UserException( | ||
"CacheSelectBackendSelector expected to be used in situations where there is exactly" + | ||
" one CN to which any given tablet is officially assigned: " + | ||
scanRangeLocations); | ||
} | ||
selectedCn = | ||
assignedCnByBackupWorker(scanRangeLocations.getLocations().get(0).getBackend_id(), | ||
resourceIsolationGroupId); | ||
} | ||
LOG.debug(String.format( | ||
"done doing assignment for resource isolation group %s, tablet %d, location %s: CN chosen are %s", | ||
resourceIsolationGroupId, | ||
tabletId.orElse(-1L), | ||
scanRangeLocations.getLocations().get(0), | ||
selectedCn.stream().map(String::valueOf).collect(Collectors.joining(",")))); | ||
|
||
for (Long cnId : selectedCn) { | ||
assignment.put(cnId, scanNode.getId().asInt(), scanRangeParams); | ||
allSelectedWorkerIds.add(cnId); | ||
} | ||
} | ||
} | ||
// Note that although we're not using the provided callerWorkerProvider above, the caller assumes that we used | ||
// it to note the selected backend ids. This is used for things like checking if the worker has died | ||
// and cancelling queries. | ||
for (long workerId : allSelectedWorkerIds) { | ||
callerWorkerProvider.selectWorkerUnchecked(workerId); | ||
} | ||
// Also, caller upstream will use the workerProvider to get ComputeNode references corresponding to the compute | ||
// nodes chosen in this function, so we must enable getting any worker regardless of availability. | ||
callerWorkerProvider.setAllowGetAnyWorker(true); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
// Copyright 2021-present StarRocks, Inc. All rights reserved. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// https://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
package com.starrocks.qe; | ||
|
||
import com.starrocks.server.GlobalStateMgr; | ||
|
||
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
// Describes how a CACHE SELECT statement should choose compute nodes to populate with the data. | ||
// Defaults: | ||
// if resource isolation groups are not specified in the CACHE SELECT statement, we assume the request intends to | ||
// populate the data cache for the current FE's resource isolation group. | ||
// If number of replicas is not specified in the CACHE SELECT statement, we assume the request intends to cache 1 replica. | ||
public class CacheSelectComputeNodeSelectionProperties { | ||
public List<String> resourceIsolationGroups; | ||
public int numReplicasDesired; | ||
|
||
public CacheSelectComputeNodeSelectionProperties(List<String> resourceIsolationGroups, int numReplicasDesired) { | ||
if (resourceIsolationGroups == null || resourceIsolationGroups.isEmpty()) { | ||
this.resourceIsolationGroups = new ArrayList<>(); | ||
this.resourceIsolationGroups.add(GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf() | ||
.getResourceIsolationGroup()); | ||
} else { | ||
this.resourceIsolationGroups = resourceIsolationGroups; | ||
} | ||
this.numReplicasDesired = Math.max(numReplicasDesired, 1); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we use a more intuitive naming? e.g. getWorkerFromUnmatchingIsolationGroup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This specific function isn't getting any worker, it's signaling the intent to allow getting any worker.