Skip to content

Commit

Permalink
[BUG] Fix remote shards balancer when filtering throttled nodes (open…
Browse files Browse the repository at this point in the history
…search-project#11724)

* fix remote shards balancer

Signed-off-by: panguixin <[email protected]>

* add change log

Signed-off-by: panguixin <[email protected]>

---------

Signed-off-by: panguixin <[email protected]>
Signed-off-by: Andrew Ross <[email protected]>
Co-authored-by: Andrew Ross <[email protected]>
  • Loading branch information
bugmakerrrrrr and andrross committed Jan 25, 2024
1 parent c1bd752 commit 9f649e0
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 6 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix parsing of single line comments in `lang-painless` ([#11815](https://github.com/opensearch-project/OpenSearch/issues/11815))
- Fix memory leak issue in ReorganizingLongHash ([#11953](https://github.com/opensearch-project/OpenSearch/issues/11953))
- Prevent setting remote_snapshot store type on index creation ([#11867](https://github.com/opensearch-project/OpenSearch/pull/11867))
- [BUG] Fix remote shards balancer when filtering throttled nodes ([#11724](https://github.com/opensearch-project/OpenSearch/pull/11724))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public final class RemoteShardsBalancer extends ShardsBalancer {
private final Logger logger;
private final RoutingAllocation allocation;
private final RoutingNodes routingNodes;
// indicates if there are any nodes being throttled for allocating any unassigned shards
private boolean anyNodesThrottled = false;

public RemoteShardsBalancer(Logger logger, RoutingAllocation allocation) {
this.logger = logger;
Expand Down Expand Up @@ -358,12 +360,16 @@ private void allocateUnassignedReplicas(Queue<RoutingNode> nodeQueue, Map<String
}

private void ignoreRemainingShards(Map<String, UnassignedIndexShards> unassignedShardMap) {
// If any nodes are throttled during allocation, mark all remaining unassigned shards as THROTTLED
final UnassignedInfo.AllocationStatus status = anyNodesThrottled
? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED
: UnassignedInfo.AllocationStatus.DECIDERS_NO;
for (UnassignedIndexShards indexShards : unassignedShardMap.values()) {
for (ShardRouting shard : indexShards.getPrimaries()) {
routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes());
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());
}
for (ShardRouting shard : indexShards.getReplicas()) {
routingNodes.unassigned().ignoreShard(shard, UnassignedInfo.AllocationStatus.DECIDERS_NO, allocation.changes());
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());
}
}
}
Expand Down Expand Up @@ -424,11 +430,11 @@ private void allocateUnassignedShards(
private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouting shard) {
boolean allocated = false;
boolean throttled = false;
Set<String> nodesCheckedForShard = new HashSet<>();
int numNodesToCheck = nodeQueue.size();
while (nodeQueue.isEmpty() == false) {
RoutingNode node = nodeQueue.poll();
--numNodesToCheck;
Decision allocateDecision = allocation.deciders().canAllocate(shard, node, allocation);
nodesCheckedForShard.add(node.nodeId());
if (allocateDecision.type() == Decision.Type.YES) {
if (logger.isTraceEnabled()) {
logger.trace("Assigned shard [{}] to [{}]", shardShortSummary(shard), node.nodeId());
Expand Down Expand Up @@ -467,6 +473,10 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
}
nodeQueue.offer(node);
} else {
if (nodeLevelDecision.type() == Decision.Type.THROTTLE) {
anyNodesThrottled = true;
}

if (logger.isTraceEnabled()) {
logger.trace(
"Cannot allocate any shard to node: [{}]. Removing from queue. Node level decisions: [{}],[{}]",
Expand All @@ -478,14 +488,14 @@ private void tryAllocateUnassignedShard(Queue<RoutingNode> nodeQueue, ShardRouti
}

// Break out if all nodes in the queue have been checked for this shard
if (nodeQueue.stream().allMatch(rn -> nodesCheckedForShard.contains(rn.nodeId()))) {
if (numNodesToCheck == 0) {
break;
}
}
}

if (allocated == false) {
UnassignedInfo.AllocationStatus status = throttled
UnassignedInfo.AllocationStatus status = (throttled || anyNodesThrottled)
? UnassignedInfo.AllocationStatus.DECIDERS_THROTTLED
: UnassignedInfo.AllocationStatus.DECIDERS_NO;
routingNodes.unassigned().ignoreShard(shard, status, allocation.changes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,11 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return Decision.ALWAYS;
}
}

@Override
public Decision canAllocateAnyShardToNode(RoutingNode node, RoutingAllocation allocation) {
return throttle ? Decision.THROTTLE : Decision.YES;
}
});
Collections.shuffle(deciders, random());
return new AllocationDeciders(deciders);
Expand Down

0 comments on commit 9f649e0

Please sign in to comment.