Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource Isolation Feature Checkpoint 4: Extend cache warmup to allow multiple resource isolation groups and multiple replicas #12

Merged
Merged
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
# *.xml line was added by cbrennan
*.xml
# build.sh added by cbrennan
ctbrennan marked this conversation as resolved.
Show resolved Hide resolved
build.sh
*.swp
*.pyc
be/output
Expand Down
3 changes: 2 additions & 1 deletion fe/fe-core/src/main/java/com/starrocks/common/ErrorCode.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,8 @@ public enum ErrorCode {
ERR_WAREHOUSE_SUSPENDED(10003, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse %s has been suspended."),
ERR_WAREHOUSE_UNAVAILABLE(10004, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse %s is not available."),
ERR_NO_NODES_IN_WAREHOUSE(10005, new byte[] {'4', '2', '0', '0', '0'},
"No alive backend or compute node in warehouse %s."),
"No alive backend or compute node in warehouse %s. Also possible that there are no CN of the " +
"resource isolation group matching the FE."),
ERR_INVALID_WAREHOUSE_NAME(10006, new byte[] {'4', '2', '0', '0', '0'}, "Warehouse name can not be null or empty"),

ERR_NOT_SUPPORTED_STATEMENT_IN_SHARED_NOTHING_MODE(10007, new byte[] {'4', '2', '0', '0', '0'},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@ public static DataCacheSelectMetrics cacheSelect(DataCacheSelectStatement statem
tmpSessionVariable.setDataCachePriority(statement.getPriority());
tmpSessionVariable.setDatacacheTTLSeconds(statement.getTTLSeconds());
tmpSessionVariable.setEnableCacheSelect(true);
// Note that although setting these values in the SessionVariable is not ideal, it's way more disruptive to pipe
// this information to where it needs to be through the insertStmt.
if (statement.getNumReplicasDesired() > 1) {
// We only set this value if it is larger than the default assumption.
tmpSessionVariable.setNumDesiredDatacacheReplicas(statement.getNumReplicasDesired());
}
if (statement.getResourceIsolationGroups() != null && !statement.getResourceIsolationGroups().isEmpty()) {
// We only set this value if it is the non-default.
tmpSessionVariable.setDatacacheSelectResourceGroups(statement.getResourceIsolationGroups());
}
connectContext.setSessionVariable(tmpSessionVariable);

InsertStmt insertStmt = statement.getInsertStmt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,11 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService
int numUsedComputeNodes,
ComputationFragmentSchedulingPolicy computationFragmentSchedulingPolicy,
long warehouseId) {

String thisFeResourceIsolationGroup = GlobalStateMgr.getCurrentState().
getNodeMgr().getMySelf().getResourceIsolationGroup();
return captureAvailableWorkers(warehouseId, thisFeResourceIsolationGroup);
}
public DefaultSharedDataWorkerProvider captureAvailableWorkers(long warehouseId, String resourceIsolationGroup) {
WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
ImmutableMap.Builder<Long, ComputeNode> builder = ImmutableMap.builder();
List<Long> computeNodeIds = warehouseManager.getAllComputeNodeIds(warehouseId);
Expand All @@ -86,12 +90,12 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService
LOG.debug("idToComputeNode: {}", idToComputeNode);
}

ImmutableMap<Long, ComputeNode> availableComputeNodes = filterAvailableWorkers(idToComputeNode);
ImmutableMap<Long, ComputeNode> availableComputeNodes = filterAvailableWorkers(idToComputeNode,
resourceIsolationGroup);
if (availableComputeNodes.isEmpty()) {
Warehouse warehouse = warehouseManager.getWarehouse(warehouseId);
throw ErrorReportException.report(ErrorCode.ERR_NO_NODES_IN_WAREHOUSE, warehouse.getName());
}

return new DefaultSharedDataWorkerProvider(idToComputeNode, availableComputeNodes);
}
}
Expand All @@ -113,10 +117,10 @@ public DefaultSharedDataWorkerProvider captureAvailableWorkers(SystemInfoService

private final Set<Long> selectedWorkerIds;


@VisibleForTesting
public DefaultSharedDataWorkerProvider(ImmutableMap<Long, ComputeNode> id2ComputeNode,
ImmutableMap<Long, ComputeNode> availableID2ComputeNode
) {
ImmutableMap<Long, ComputeNode> availableID2ComputeNode) {
this.id2ComputeNode = id2ComputeNode;
this.availableID2ComputeNode = availableID2ComputeNode;
this.selectedWorkerIds = Sets.newConcurrentHashSet();
Expand Down Expand Up @@ -304,13 +308,12 @@ private static ComputeNode getNextWorker(ImmutableMap<Long, ComputeNode> workers
return workers.values().asList().get(index);
}

private static ImmutableMap<Long, ComputeNode> filterAvailableWorkers(ImmutableMap<Long, ComputeNode> workers) {
String thisFeResourceIsolationGroup = GlobalStateMgr.getCurrentState().
getNodeMgr().getMySelf().getResourceIsolationGroup();
private static ImmutableMap<Long, ComputeNode> filterAvailableWorkers(ImmutableMap<Long, ComputeNode> workers,
String resourceIsolationGroup) {
ImmutableMap.Builder<Long, ComputeNode> builder = new ImmutableMap.Builder<>();
for (Map.Entry<Long, ComputeNode> entry : workers.entrySet()) {
if (entry.getValue().isAlive() && !SimpleScheduler.isInBlocklist(entry.getKey()) &&
resourceIsolationGroupMatches(thisFeResourceIsolationGroup, entry.getValue().getResourceIsolationGroup())) {
resourceIsolationGroupMatches(resourceIsolationGroup, entry.getValue().getResourceIsolationGroup())) {
builder.put(entry);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.starrocks.qe;

import com.starrocks.common.DdlException;
import com.starrocks.common.UserException;
import com.starrocks.lake.qe.scheduler.DefaultSharedDataWorkerProvider;
import com.starrocks.planner.ScanNode;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.SystemInfoService;
import com.starrocks.system.TabletComputeNodeMapper;
import com.starrocks.thrift.TScanRangeLocations;
import com.starrocks.thrift.TScanRangeParams;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import static com.starrocks.qe.scheduler.Utils.getOptionalTabletId;

// This class should only be used in shared data mode.
public class CacheSelectBackendSelector implements BackendSelector {
private static final Logger LOG = LogManager.getLogger(NormalBackendSelector.class);

// Inputs
private final ScanNode scanNode;
private final List<TScanRangeLocations> locations;
private final CacheSelectComputeNodeSelectionProperties props;
private final long warehouseId;

// Outputs
private final FragmentScanRangeAssignment assignment;
private final Set<Long> allSelectedWorkerIds;

public CacheSelectBackendSelector(ScanNode scanNode, List<TScanRangeLocations> locations,
FragmentScanRangeAssignment assignment,
CacheSelectComputeNodeSelectionProperties props, long warehouseId) {
this.scanNode = scanNode;
this.locations = locations;
this.assignment = assignment;
this.props = props;
this.warehouseId = warehouseId;
this.allSelectedWorkerIds = new HashSet<>();
}

public Set<Long> getSelectedWorkerIds() {
return allSelectedWorkerIds;
}

private Set<Long> getAssignedCnByTabletId(SystemInfoService systemInfoService, Long tabletId,
String resourceIsolationGroupId) throws UserException {
TabletComputeNodeMapper mapper = systemInfoService.internalTabletMapper();
List<Long> cnIdsOrderedByPreference = mapper.computeNodesForTablet(
tabletId, props.numReplicasDesired, resourceIsolationGroupId);
if (cnIdsOrderedByPreference.size() < props.numReplicasDesired) {
throw new DdlException(String.format("Requesting more replicas than we have available CN" +

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a TODO: if requesting more replicas than available, shall we trigger a replica load task, from S3 to compute node

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not totally sure what you mean by "replica load task", that basically describes what is already being executed here. Are you saying we should schedule it for later, if/when we have more compute nodes available? I don't think that's a good approach necessarily.

My thinking is this: if we're requesting more replicas than we have compute nodes, we can't fulfill the intention of the statement, so I'm throwing an exception. The other option is to make a best-effort attempt and select all of the available compute nodes.

" for the specified resource group. desiredReplicas: %d, resourceGroup: %s",
props.numReplicasDesired, resourceIsolationGroupId));
}
return new HashSet<>(cnIdsOrderedByPreference);
}

private Set<Long> getAssignedCnByBackupForTargetCn(Long mainTargetCnId, String resourceIsolationGroupId)
ctbrennan marked this conversation as resolved.
Show resolved Hide resolved
throws UserException {
Set<Long> selectedCn = new HashSet<>();
DefaultSharedDataWorkerProvider workerProvider = new DefaultSharedDataWorkerProvider.Factory().
captureAvailableWorkers(warehouseId, resourceIsolationGroupId);
long targetBackendId = mainTargetCnId;
while (selectedCn.size() < props.numReplicasDesired) {
if (selectedCn.contains(targetBackendId) ||
!workerProvider.isDataNodeAvailable(targetBackendId)) {
targetBackendId = workerProvider.selectBackupWorker(targetBackendId, Optional.empty());
if (selectedCn.contains(targetBackendId)) {
workerProvider.reportDataNodeNotFoundException();
throw new DdlException(String.format("Requesting more replicas than we have available CN" +
" for the specified resource group. desiredReplicas: %d, resourceGroup: %s",
props.numReplicasDesired, resourceIsolationGroupId));
}
}
selectedCn.add(targetBackendId);
}
return selectedCn;
}

@Override
public void computeScanRangeAssignment() throws UserException {
if (props.resourceIsolationGroups == null || props.resourceIsolationGroups.isEmpty()) {
throw new UserException("Should not have constructed CacheSelectBackendSelector with no" +
" resourceIsolationGroups specified.");
}
if (props.numReplicasDesired < 1) {
throw new UserException("Num replicas desired in cache must be at least 1: " + props.numReplicasDesired);
}

SystemInfoService systemInfoService =
GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
// Try to create assignments for each of the resourceIsolationGroups specified.
for (TScanRangeLocations scanRangeLocations : locations) {
if (scanRangeLocations.getLocationsSize() != 1) {
throw new UserException("CacheSelectBackendSelector expected to be used in situations where there" +
" is exactly one CN to which any given tablet is officially assigned: " +
scanRangeLocations);
}
TScanRangeParams scanRangeParams = new TScanRangeParams(scanRangeLocations.scan_range);
Optional<Long> tabletId = getOptionalTabletId(scanRangeLocations.scan_range);
for (String resourceIsolationGroupId : props.resourceIsolationGroups) {
Set<Long> selectedCn;
// If we've been provided the relevant tablet id, and we're using resource isolation groups, which
// is when we prefer to use the internal mapping, then we populate the datacaches of the CN which
// are most preferred for the tablet.
if (tabletId.isPresent() && systemInfoService.shouldUseInternalTabletToCnMapper()) {
selectedCn = getAssignedCnByTabletId(systemInfoService, tabletId.get(),
resourceIsolationGroupId);
} else {
selectedCn = getAssignedCnByBackupForTargetCn(
scanRangeLocations.getLocations().get(0).getBackend_id(), resourceIsolationGroupId);
}
for (Long cnId : selectedCn) {
assignment.put(cnId, scanNode.getId().asInt(), scanRangeParams);
allSelectedWorkerIds.add(cnId);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.qe;

import com.starrocks.server.GlobalStateMgr;

import java.util.ArrayList;
import java.util.List;

// Describes how a CACHE SELECT statement should choose compute nodes to populate with the data.
// Defaults:
// if resource isolation groups are not specified in the CACHE SELECT statement, we assume the request intends to
// populate the data cache for the current FE's resource isolation group.
// If number of replicas is not specified in the CACHE SELECT statement, we assume the request intends to cache 1 replica.
public class CacheSelectComputeNodeSelectionProperties {
public List<String> resourceIsolationGroups;
public int numReplicasDesired;

public CacheSelectComputeNodeSelectionProperties(List<String> resourceIsolationGroups, int numReplicasDesired) {
if (resourceIsolationGroups == null || resourceIsolationGroups.isEmpty()) {
this.resourceIsolationGroups = new ArrayList<>();
this.resourceIsolationGroups.add(GlobalStateMgr.getCurrentState().getNodeMgr().getMySelf()
.getResourceIsolationGroup());
} else {
this.resourceIsolationGroups = resourceIsolationGroups;
}
this.numReplicasDesired = Math.max(numReplicasDesired, 1);
}
}
19 changes: 19 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/SessionVariable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1662,6 +1662,9 @@ public String getCatalog() {

private boolean enableCacheSelect = false;

private List<String> datacacheSelectResourceGroups = null;
private int numDesiredDatacacheReplicas = -1;

@VariableMgr.VarAttr(name = ENABLE_DYNAMIC_PRUNE_SCAN_RANGE)
private boolean enableDynamicPruneScanRange = true;

Expand Down Expand Up @@ -3978,6 +3981,22 @@ public void setLikePredicateConsolidateMin(int value) {
this.likePredicateConsolidateMin = value;
}

public List<String> getDatacacheSelectResourceGroups() {
return datacacheSelectResourceGroups;
}

public void setDatacacheSelectResourceGroups(List<String> datacacheSelectResourceGroups) {
this.datacacheSelectResourceGroups = datacacheSelectResourceGroups;
}

public int getNumDesiredDatacacheReplicas() {
return numDesiredDatacacheReplicas;
}

public void setNumDesiredDatacacheReplicas(int numDesiredDatacacheReplicas) {
this.numDesiredDatacacheReplicas = numDesiredDatacacheReplicas;
}

// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.starrocks.planner.ScanNode;
import com.starrocks.planner.SchemaScanNode;
import com.starrocks.qe.BackendSelector;
import com.starrocks.qe.CacheSelectBackendSelector;
import com.starrocks.qe.CacheSelectComputeNodeSelectionProperties;
import com.starrocks.qe.ColocatedBackendSelector;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.FragmentScanRangeAssignment;
Expand Down Expand Up @@ -60,6 +62,9 @@ public static BackendSelector create(ScanNode scanNode,
SessionVariable sessionVariable = connectContext.getSessionVariable();
FragmentScanRangeAssignment assignment = execFragment.getScanRangeAssignment();

int desiredDatacacheReplicas = sessionVariable.getNumDesiredDatacacheReplicas();
List<String> datacacheSelectResourceGroups = sessionVariable.getDatacacheSelectResourceGroups();

if (scanNode instanceof SchemaScanNode) {
return new NormalBackendSelector(scanNode, locations, assignment, workerProvider, false);
} else if (scanNode instanceof HdfsScanNode || scanNode instanceof IcebergScanNode ||
Expand All @@ -69,6 +74,22 @@ public static BackendSelector create(ScanNode scanNode,
return new HDFSBackendSelector(scanNode, locations, assignment, workerProvider,
sessionVariable.getForceScheduleLocal(),
sessionVariable.getHDFSBackendSelectorScanRangeShuffle());
} else if (desiredDatacacheReplicas > 1 || datacacheSelectResourceGroups != null) {
// Note that a cacheSelect should never be hasReplicated (because currently shared-data mode otherwise
// doesn't support multiple replicas in cache), and it should never be hasColocate (because a cache select
// statement is for a single table).
CacheSelectBackendSelector selector = new CacheSelectBackendSelector(
scanNode, locations, assignment, new CacheSelectComputeNodeSelectionProperties(
datacacheSelectResourceGroups, desiredDatacacheReplicas), connectContext.getCurrentWarehouseId());
// We don't pass the WorkerProvider since the CacheSelectBackendSelector will need to create WorkerProviders
// for a different resource isolation group.
// Note that although we're not using the provided workerProvider above, the caller assumes that we used it
// to note the selected backend ids. This is used for things like checking if the worker has died
// and cancelling queries.
for (long workerId : selector.getSelectedWorkerIds()) {
workerProvider.selectWorkerUnchecked(workerId);
}
return selector;
} else {
boolean hasColocate = execFragment.isColocated();
boolean hasBucket = execFragment.isLocalBucketShuffleJoin();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@

import java.time.Duration;
import java.time.format.DateTimeParseException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -164,6 +165,17 @@ public Void visitDataCacheSelectStatement(DataCacheSelectStatement statement, Co
}
statement.setTTLSeconds(ttlSeconds);

statement.setNumReplicasDesired(Integer.parseInt(
properties.getOrDefault("num_replicas", "1")));
if (statement.getNumReplicasDesired() < 1) {
throw new SemanticException("Num replicas must be positive");
}

String resourceIsolationGroupsString = properties.getOrDefault("resource_isolation_groups",
ctbrennan marked this conversation as resolved.
Show resolved Hide resolved
"");
if (!resourceIsolationGroupsString.isEmpty()) {
statement.setResourceIsolationGroups(Arrays.asList(resourceIsolationGroupsString.split(",")));
}
return null;
}
}
Expand Down
Loading