Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource Isolation Feature Checkpoint 4: Extend cache warmup to allow multiple resource isolation groups and multiple replicas #12

Merged
Merged
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public enum ErrorCode {
ERR_WAREHOUSE_SUSPENDED(10003, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse %s has been suspended."),
ERR_WAREHOUSE_UNAVAILABLE(10004, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse %s is not available."),
ERR_NO_NODES_IN_WAREHOUSE(10005, new byte[] {'4', '2', '0', '0', '0'},
"No alive backend or compute node in warehouse %s."),
"No alive backend or compute node in warehouse %s. Also possible that there are no CN of the " +
"resource isolation group matching the FE."),
ERR_INVALID_WAREHOUSE_NAME(10006, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse name can not be null or empty"),

ERR_NOT_SUPPORTED_STATEMENT_IN_SHARED_NOTHING_MODE(10007, new byte[] {'4', '2', '0', '0', '0'},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ public static DataCacheSelectMetrics cacheSelect(DataCacheSelectStatement statem
tmpSessionVariable.setDataCachePriority(statement.getPriority());
tmpSessionVariable.setDatacacheTTLSeconds(statement.getTTLSeconds());
tmpSessionVariable.setEnableCacheSelect(true);
// Note that although setting these values in the SessionVariable is not ideal, it's way more disruptive to pipe
// this information to where it needs to be through the insertStmt.
if (statement.getNumReplicasDesired() > 1) {
// We only set this value if it is larger than the default assumption.
tmpSessionVariable.setNumDesiredDatacacheReplicas(statement.getNumReplicasDesired());
}
if (statement.getResourceIsolationGroups() != null && !statement.getResourceIsolationGroups().isEmpty()) {
// We only set this value if it is the non-default.
tmpSessionVariable.setDatacacheSelectResourceGroups(statement.getResourceIsolationGroups());
}
connectContext.setSessionVariable(tmpSessionVariable);

InsertStmt insertStmt = statement.getInsertStmt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService
int numUsedComputeNodes,
ComputationFragmentSchedulingPolicy computationFragmentSchedulingPolicy,
long warehouseId) {

String thisFeResourceIsolationGroup = GlobalStateMgr.getCurrentState().
getNodeMgr().getMySelf().getResourceIsolationGroup();
return captureAvailableWorkers(warehouseId, thisFeResourceIsolationGroup);
}
public DefaultSharedDataWorkerProvider captureAvailableWorkers(long warehouseId, String resourceIsolationGroup) {
WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
ImmutableMap.Builder<Long, ComputeNode> builder = ImmutableMap.builder();
List<Long> computeNodeIds = warehouseManager.getAllComputeNodeIds(warehouseId);
Expand All @@ -86,12 +90,12 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService
LOG.debug("idToComputeNode: {}", idToComputeNode);
}

ImmutableMap<Long, ComputeNode> availableComputeNodes = filterAvailableWorkers(idToComputeNode);
ImmutableMap<Long, ComputeNode> availableComputeNodes = filterAvailableWorkers(idToComputeNode,
resourceIsolationGroup);
if (availableComputeNodes.isEmpty()) {
Warehouse warehouse = warehouseManager.getWarehouse(warehouseId);
throw ErrorReportException.report(ErrorCode.ERR_NO_NODES_IN_WAREHOUSE, warehouse.getName());
}

return new DefaultSharedDataWorkerProvider(idToComputeNode, availableComputeNodes);
}
}
Expand All @@ -113,10 +117,11 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService

private final Set<Long> selectedWorkerIds;

private boolean allowGetAnyWorker = false;

@VisibleForTesting
public DefaultSharedDataWorkerProvider(ImmutableMap<Long, ComputeNode> id2ComputeNode,
ImmutableMap<Long, ComputeNode> availableID2ComputeNode
) {
ImmutableMap<Long, ComputeNode> availableID2ComputeNode) {
this.id2ComputeNode = id2ComputeNode;
this.availableID2ComputeNode = availableID2ComputeNode;
this.selectedWorkerIds = Sets.newConcurrentHashSet();
Expand Down Expand Up @@ -158,9 +163,33 @@ public Collection<ComputeNode> getAllWorkers() {
return availableID2ComputeNode.values();
}

// Functionality: If allowGetAnyWorker is set, then getWorkerById will use global state to get a ComputeNode
// reference instead of its available node map.
// Where it's used: As of writing this, this is only used for CACHE SELECT statements,
// Why: This is necessary because FragmentAssignmentStrategy classes, Coordinator, and CoordinatorPreprocessors use
// the WorkerProvider interface to get ComputeNode references. During a CACHE SELECT execution (which is the only
// type of statement that should be selecting workers from an un-matching resource isolation group), they will need
// to get the ComputeNode addresses for nodes outside the leader group.
// Alternative considered: Another way of achieving this would be to allow any caller to get a ComputeNode reference
// for any compute node, but that would leave space for bugs during the execution of other types of statements which
// really shouldn't be getting ComputeNode references for un-matching resource isolation groups or unhealthy
// ComputeNodes. Instead of changing a bunch of code which uses the WorkerProvider in a specific way, this way
// limits scope to only change behavior when the user of the WorkerProvider sets this very specific option.
public void setAllowGetAnyWorker(boolean allowGetAnyWorker) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use a more intuitive naming? e.g. getWorkerFromUnmatchingIsolationGroup?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This specific function isn't getting any worker, it's signaling the intent to allow getting any worker.

this.allowGetAnyWorker = allowGetAnyWorker;
}

@Override
public ComputeNode getWorkerById(long workerId) {
return availableID2ComputeNode.get(workerId);
ComputeNode cn = availableID2ComputeNode.get(workerId);
if (cn == null && allowGetAnyWorker) {
SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
cn = systemInfoService.getBackendOrComputeNode(workerId);
if (cn == null) {
LOG.warn(String.format("could not get worker by id: %s", workerId));
}
}
return cn;
}

@Override
Expand Down Expand Up @@ -304,13 +333,13 @@ private static ComputeNode getNextWorker(ImmutableMap<Long, ComputeNode> workers
return workers.values().asList().get(index);
}

private static ImmutableMap<Long, ComputeNode> filterAvailableWorkers(ImmutableMap<Long, ComputeNode> workers) {
String thisFeResourceIsolationGroup = GlobalStateMgr.getCurrentState().
getNodeMgr().getMySelf().getResourceIsolationGroup();
private static ImmutableMap<Long, ComputeNode> filterAvailableWorkers(ImmutableMap<Long, ComputeNode> workers,
String resourceIsolationGroup) {
ImmutableMap.Builder<Long, ComputeNode> builder = new ImmutableMap.Builder<>();
for (Map.Entry<Long, ComputeNode> entry : workers.entrySet()) {
if (entry.getValue().isAlive() && !SimpleScheduler.isInBlocklist(entry.getKey()) &&
resourceIsolationGroupMatches(thisFeResourceIsolationGroup, entry.getValue().getResourceIsolationGroup())) {
resourceIsolationGroupMatches(resourceIsolationGroup,
entry.getValue().getResourceIsolationGroup())) {
builder.put(entry);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.qe;

import com.starrocks.common.DdlException;
import com.starrocks.common.UserException;
import com.starrocks.lake.qe.scheduler.DefaultSharedDataWorkerProvider;
import com.starrocks.planner.ScanNode;
import com.starrocks.qe.scheduler.WorkerProvider;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.SystemInfoService;
import com.starrocks.system.TabletComputeNodeMapper;
import com.starrocks.thrift.TScanRangeLocations;
import com.starrocks.thrift.TScanRangeParams;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static com.starrocks.qe.scheduler.Utils.getOptionalTabletId;

// This class should only be used in shared data mode.
public class CacheSelectBackendSelector implements BackendSelector {
private static final Logger LOG = LogManager.getLogger(CacheSelectBackendSelector.class);

// Inputs
private final ScanNode scanNode;
private final List<TScanRangeLocations> locations;
private final CacheSelectComputeNodeSelectionProperties props;
private final long warehouseId;

// Outputs
private final FragmentScanRangeAssignment assignment;
// This WorkerProvider is used to provide signal to the caller, but not used to select the compute nodes to use.
private final WorkerProvider callerWorkerProvider;

public CacheSelectBackendSelector(ScanNode scanNode, List<TScanRangeLocations> locations,
FragmentScanRangeAssignment assignment, WorkerProvider callerWorkerProvider,
CacheSelectComputeNodeSelectionProperties props, long warehouseId) {
this.scanNode = scanNode;
this.locations = locations;
this.assignment = assignment;
this.callerWorkerProvider = callerWorkerProvider;
this.props = props;
this.warehouseId = warehouseId;
}

private Set<Long> assignedCnByTabletId(SystemInfoService systemInfoService, Long tabletId,
String resourceIsolationGroupId) throws UserException {
TabletComputeNodeMapper mapper = systemInfoService.internalTabletMapper();
List<Long> cnIdsOrderedByPreference =
mapper.computeNodesForTablet(tabletId, props.numReplicasDesired, resourceIsolationGroupId);
if (cnIdsOrderedByPreference.size() < props.numReplicasDesired) {
throw new DdlException(String.format("Requesting more replicas than we have available CN" +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a TODO: if requesting more replicas than available, shall we trigger a replica load task, from S3 to compute node

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not totally sure what you mean by "replica load task", that basically describes what is already being executed here. Are you saying we should schedule it for later, if/when we have more compute nodes available? I don't think that's a good approach necessarily.

My thinking is this: if we're requesting more replicas than we have compute nodes, we can't fulfill the intention of the statement, so I'm throwing an exception. The other option is to make a best-effort attempt and select all of the available compute nodes.

" for the specified resource group. desiredReplicas: %d, resourceGroup: %s, tabletId: %d",
props.numReplicasDesired, resourceIsolationGroupId, tabletId));
}
return new HashSet<>(cnIdsOrderedByPreference);
}

private Set<Long> assignedCnByBackupWorker(Long mainTargetCnId, String resourceIsolationGroupId)
throws UserException {
Set<Long> selectedCn = new HashSet<>();
DefaultSharedDataWorkerProvider workerProvider =
new DefaultSharedDataWorkerProvider.Factory().captureAvailableWorkers(warehouseId,
resourceIsolationGroupId);
long targetBackendId = mainTargetCnId;
while (selectedCn.size() < props.numReplicasDesired) {
if (selectedCn.contains(targetBackendId) || !workerProvider.isDataNodeAvailable(targetBackendId)) {
targetBackendId = workerProvider.selectBackupWorker(targetBackendId, Optional.empty());
if (targetBackendId < 0 || selectedCn.contains(targetBackendId)) {
workerProvider.reportDataNodeNotFoundException();
throw new DdlException(String.format("Requesting more replicas than we have available CN" +
" for the specified resource group. desiredReplicas: %d, resourceGroup: %s",
props.numReplicasDesired, resourceIsolationGroupId));
}
}
selectedCn.add(targetBackendId);
}
return selectedCn;
}

@Override
public void computeScanRangeAssignment() throws UserException {
if (props.resourceIsolationGroups == null || props.resourceIsolationGroups.isEmpty()) {
throw new UserException("Should not have constructed CacheSelectBackendSelector with no" +
" resourceIsolationGroups specified.");
}
if (props.numReplicasDesired < 1) {
throw new UserException("Num replicas desired in cache must be at least 1: " + props.numReplicasDesired);
}

SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
Set<Long> allSelectedWorkerIds = new HashSet<>();
for (TScanRangeLocations scanRangeLocations : locations) {
TScanRangeParams scanRangeParams = new TScanRangeParams(scanRangeLocations.scan_range);
Optional<Long> tabletId = getOptionalTabletId(scanRangeLocations.scan_range);
// Try to create assignments for each of the resourceIsolationGroups specified.
for (String resourceIsolationGroupId : props.resourceIsolationGroups) {
Set<Long> selectedCn;
// If we've been provided the relevant tablet id, and we're using resource isolation groups, which
// is when we prefer to use the internal mapping, then we populate the datacaches of the CN which
// are most preferred for the tablet.
if (tabletId.isPresent() && systemInfoService.shouldUseInternalTabletToCnMapper()) {
selectedCn = assignedCnByTabletId(systemInfoService, tabletId.get(), resourceIsolationGroupId);
} else {
if (scanRangeLocations.getLocationsSize() != 1) {
throw new UserException(
"CacheSelectBackendSelector expected to be used in situations where there is exactly" +
" one CN to which any given tablet is officially assigned: " +
scanRangeLocations);
}
selectedCn =
assignedCnByBackupWorker(scanRangeLocations.getLocations().get(0).getBackend_id(),
resourceIsolationGroupId);
}
LOG.debug(String.format(
"done doing assignment for resource isolation group %s, tablet %d, location %s: CN chosen are %s",
resourceIsolationGroupId,
tabletId.orElse(-1L),
scanRangeLocations.getLocations().get(0),
selectedCn.stream().map(String::valueOf).collect(Collectors.joining(","))));

for (Long cnId : selectedCn) {
assignment.put(cnId, scanNode.getId().asInt(), scanRangeParams);
allSelectedWorkerIds.add(cnId);
}
}
}
// Note that although we're not using the provided callerWorkerProvider above, the caller assumes that we used
// it to note the selected backend ids. This is used for things like checking if the worker has died
// and cancelling queries.
for (long workerId : allSelectedWorkerIds) {
callerWorkerProvider.selectWorkerUnchecked(workerId);
}
// Also, caller upstream will use the workerProvider to get ComputeNode references corresponding to the compute
// nodes chosen in this function, so we must enable getting any worker regardless of availability.
callerWorkerProvider.setAllowGetAnyWorker(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.qe;

import com.starrocks.server.GlobalStateMgr;

import java.util.ArrayList;
import java.util.List;

// Describes how a CACHE SELECT statement should choose compute nodes to populate with the data.
// Defaults:
// if resource isolation groups are not specified in the CACHE SELECT statement, we assume the request intends to
// populate the data cache for the current FE's resource isolation group.
// If number of replicas is not specified in the CACHE SELECT statement, we assume the request intends to cache 1 replica.
public class CacheSelectComputeNodeSelectionProperties {
public List<String> resourceIsolationGroups;
public int numReplicasDesired;

public CacheSelectComputeNodeSelectionProperties(List<String> resourceIsolationGroups, int numReplicasDesired) {
if (resourceIsolationGroups == null || resourceIsolationGroups.isEmpty()) {
this.resourceIsolationGroups = new ArrayList<>();
this.resourceIsolationGroups.add(GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf()
.getResourceIsolationGroup());
} else {
this.resourceIsolationGroups = resourceIsolationGroups;
}
this.numReplicasDesired = Math.max(numReplicasDesired, 1);
}
}
19 changes: 19 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,9 @@ public String getCatalog() {

private boolean enableCacheSelect = false;

private List<String> datacacheSelectResourceGroups = null;
private int numDesiredDatacacheReplicas = -1;

@VariableMgr.VarAttr(name = ENABLE_DYNAMIC_PRUNE_SCAN_RANGE)
private boolean enableDynamicPruneScanRange = true;

Expand Down Expand Up @@ -3978,6 +3981,22 @@ public void setLikePredicateConsolidateMin(int value) {
this.likePredicateConsolidateMin = value;
}

public List<String> getDatacacheSelectResourceGroups() {
return datacacheSelectResourceGroups;
}

public void setDatacacheSelectResourceGroups(List<String> datacacheSelectResourceGroups) {
this.datacacheSelectResourceGroups = datacacheSelectResourceGroups;
}

public int getNumDesiredDatacacheReplicas() {
return numDesiredDatacacheReplicas;
}

public void setNumDesiredDatacacheReplicas(int numDesiredDatacacheReplicas) {
this.numDesiredDatacacheReplicas = numDesiredDatacacheReplicas;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ WorkerProvider captureAvailableWorkers(SystemInfoService systemInfoService,

Collection<ComputeNode> getAllWorkers();

// Makes it so getWorkerById is not restricted by which nodes are "available"/of the same resource isolation group.
default void setAllowGetAnyWorker(boolean allowGetAnyWorker) throws UnsupportedOperationException {
throw new UnsupportedOperationException("setAllowGetAnyWorker is not supported.");
}

ComputeNode getWorkerById(long workerId);

boolean isDataNodeAvailable(long dataNodeId);
Expand Down
Loading