Skip to content

Skip can match when batched query execution is available #127471

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
cd1b8cd
Skip can_match phase for nodes that support batched query execution
original-brownbear Apr 28, 2025
a106847
cleanup
original-brownbear Apr 28, 2025
163cf13
works better
original-brownbear Apr 28, 2025
2da2b0c
WIP: Skip can_match phase on nodes that support batched query execution
original-brownbear Apr 28, 2025
2ab1d8c
drier
original-brownbear Apr 28, 2025
7f9b9c5
-noise
original-brownbear Apr 28, 2025
a7bdf1f
-noise
original-brownbear Apr 28, 2025
41a9bdb
Merge remote-tracking branch 'elastic/main' into skip-can-match-on-ba…
original-brownbear Apr 28, 2025
9e840c7
-noise
original-brownbear Apr 28, 2025
d59dbe4
Merge branch 'main' into skip-can-match-on-batched
original-brownbear Apr 28, 2025
54a1c1e
Merge branch 'main' into skip-can-match-on-batched
original-brownbear Apr 28, 2025
c311fff
Merge remote-tracking branch 'elastic/main' into skip-can-match-on-ba…
original-brownbear Apr 29, 2025
4a132d2
Merge remote-tracking branch 'elastic' into skip-can-match-on-batched
original-brownbear Apr 29, 2025
853b755
fix test
original-brownbear Apr 29, 2025
cf41bf1
Merge branch 'main' into skip-can-match-on-batched
original-brownbear Apr 30, 2025
8b3ef21
Merge remote-tracking branch 'elastic/main' into skip-can-match-on-ba…
original-brownbear May 1, 2025
83bbe58
fix bwc
original-brownbear May 1, 2025
fdbd51d
Merge remote-tracking branch 'origin/skip-can-match-on-batched' into …
original-brownbear May 1, 2025
48dc5c0
Merge remote-tracking branch 'elastic/main' into skip-can-match-on-ba…
original-brownbear May 1, 2025
756edf5
less change
original-brownbear May 1, 2025
99a094d
[CI] Auto commit changes from spotless
elasticsearchmachine May 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.elasticsearch.search.CanMatchShardResponse;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.MinAndMax;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;

Expand All @@ -39,6 +39,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.function.BiFunction;

Expand Down Expand Up @@ -75,7 +76,10 @@ final class CanMatchPreFilterSearchPhase {
private final FixedBitSet possibleMatches;
private final MinAndMax<?>[] minAndMaxes;
private int numPossibleMatches;
private final CoordinatorRewriteContextProvider coordinatorRewriteContextProvider;
// True if the initiating action to this can_match run is doing batched query phase execution.
// If batched query phase execution is in use, then there is no need to physically send can_match requests to other nodes
// and only the coordinating coordinator can_match logic will run.
private final boolean batchQueryPhase;

private CanMatchPreFilterSearchPhase(
Logger logger,
Expand All @@ -89,7 +93,7 @@ private CanMatchPreFilterSearchPhase(
TransportSearchAction.SearchTimeProvider timeProvider,
SearchTask task,
boolean requireAtLeastOneMatch,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider,
boolean batchQueryPhase,
ActionListener<List<SearchShardIterator>> listener
) {
this.logger = logger;
Expand All @@ -103,7 +107,6 @@ private CanMatchPreFilterSearchPhase(
this.aliasFilter = aliasFilter;
this.task = task;
this.requireAtLeastOneMatch = requireAtLeastOneMatch;
this.coordinatorRewriteContextProvider = coordinatorRewriteContextProvider;
this.executor = executor;
final int size = shardsIts.size();
possibleMatches = new FixedBitSet(size);
Expand All @@ -122,6 +125,7 @@ private CanMatchPreFilterSearchPhase(
shardItIndexMap.put(naturalOrder[j], j);
}
this.shardItIndexMap = shardItIndexMap;
this.batchQueryPhase = batchQueryPhase;
}

public static SubscribableListener<List<SearchShardIterator>> execute(
Expand All @@ -130,17 +134,19 @@ public static SubscribableListener<List<SearchShardIterator>> execute(
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter,
Map<String, Float> concreteIndexBoosts,
Executor executor,
SearchRequest request,
List<SearchShardIterator> shardsIts,
TransportSearchAction.SearchTimeProvider timeProvider,
SearchTask task,
boolean requireAtLeastOneMatch,
CoordinatorRewriteContextProvider coordinatorRewriteContextProvider
boolean batchQueryPhase,
SearchService searchService
) {

if (shardsIts.isEmpty()) {
return SubscribableListener.newSucceeded(List.of());
}
ExecutorService executor = searchTransportService.transportService().getThreadPool().executor(ThreadPool.Names.SEARCH_COORDINATION);
final SubscribableListener<List<SearchShardIterator>> listener = new SubscribableListener<>();
// Note that the search is failed when this task is rejected by the executor
executor.execute(new AbstractRunnable() {
Expand All @@ -167,9 +173,9 @@ protected void doRun() {
timeProvider,
task,
requireAtLeastOneMatch,
coordinatorRewriteContextProvider,
batchQueryPhase && searchService.batchQueryPhase(),
listener
).runCoordinatorRewritePhase();
).runCoordinatorRewritePhase(searchService.getCoordinatorRewriteContextProvider(timeProvider::absoluteStartMillis));
}
});
return listener;
Expand All @@ -181,7 +187,7 @@ private static boolean assertSearchCoordinationThread() {

// tries to pre-filter shards based on information that's available to the coordinator
// without having to reach out to the actual shards
private void runCoordinatorRewritePhase() {
private void runCoordinatorRewritePhase(CoordinatorRewriteContextProvider coordinatorRewriteContextProvider) {
// TODO: the index filter (i.e, `_index:patten`) should be prefiltered on the coordinator
assert assertSearchCoordinationThread();
final List<SearchShardIterator> matchedShardLevelRequests = new ArrayList<>();
Expand Down Expand Up @@ -304,36 +310,17 @@ protected void doRun() {

var sendingTarget = entry.getKey();
try {
searchTransportService.sendCanMatch(
nodeIdToConnection.apply(sendingTarget.clusterAlias, sendingTarget.nodeId),
canMatchNodeRequest,
task,
new ActionListener<>() {
@Override
public void onResponse(CanMatchNodeResponse canMatchNodeResponse) {
assert canMatchNodeResponse.getResponses().size() == canMatchNodeRequest.getShardLevelRequests().size();
for (int i = 0; i < canMatchNodeResponse.getResponses().size(); i++) {
CanMatchNodeResponse.ResponseOrFailure response = canMatchNodeResponse.getResponses().get(i);
if (response.getResponse() != null) {
CanMatchShardResponse shardResponse = response.getResponse();
shardResponse.setShardIndex(shardLevelRequests.get(i).getShardRequestIndex());
onOperation(shardResponse.getShardIndex(), shardResponse);
} else {
Exception failure = response.getException();
assert failure != null;
onOperationFailed(shardLevelRequests.get(i).getShardRequestIndex(), failure);
}
}
}

@Override
public void onFailure(Exception e) {
for (CanMatchNodeRequest.Shard shard : shardLevelRequests) {
onOperationFailed(shard.getShardRequestIndex(), e);
}
}
var connection = nodeIdToConnection.apply(sendingTarget.clusterAlias, sendingTarget.nodeId);
if (batchQueryPhase && SearchQueryThenFetchAsyncAction.connectionSupportsBatchedExecution(connection)) {
for (CanMatchNodeRequest.Shard shard : shardLevelRequests) {
final int idx = shard.getShardRequestIndex();
CanMatchShardResponse shardResponse = new CanMatchShardResponse(true, null);
shardResponse.setShardIndex(idx);
onOperation(idx, shardResponse);
}
);
} else {
bwcSendCanMatchRequest(connection, canMatchNodeRequest, shardLevelRequests);
}
} catch (Exception e) {
for (CanMatchNodeRequest.Shard shard : shardLevelRequests) {
onOperationFailed(shard.getShardRequestIndex(), e);
Expand All @@ -342,6 +329,38 @@ public void onFailure(Exception e) {
}
}

private void bwcSendCanMatchRequest(
Transport.Connection connection,
CanMatchNodeRequest canMatchNodeRequest,
List<CanMatchNodeRequest.Shard> shardLevelRequests
) {
searchTransportService.sendCanMatch(connection, canMatchNodeRequest, task, new ActionListener<>() {
@Override
public void onResponse(CanMatchNodeResponse canMatchNodeResponse) {
assert canMatchNodeResponse.getResponses().size() == shardLevelRequests.size();
for (int i = 0; i < canMatchNodeResponse.getResponses().size(); i++) {
CanMatchNodeResponse.ResponseOrFailure response = canMatchNodeResponse.getResponses().get(i);
if (response.getResponse() != null) {
CanMatchShardResponse shardResponse = response.getResponse();
shardResponse.setShardIndex(shardLevelRequests.get(i).getShardRequestIndex());
onOperation(shardResponse.getShardIndex(), shardResponse);
} else {
Exception failure = response.getException();
assert failure != null;
onOperationFailed(shardLevelRequests.get(i).getShardRequestIndex(), failure);
}
}
}

@Override
public void onFailure(Exception e) {
for (CanMatchNodeRequest.Shard shard : shardLevelRequests) {
onOperationFailed(shard.getShardRequestIndex(), e);
}
}
});
}

private void onOperation(int idx, CanMatchShardResponse response) {
failedResponses.set(idx, null);
consumeResult(response);
Expand Down Expand Up @@ -461,29 +480,35 @@ private synchronized List<SearchShardIterator> getIterator(List<SearchShardItera
if (shouldSortShards(minAndMaxes) == false) {
return shardsIts;
}
FieldSortBuilder fieldSort = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
return sortShards(shardsIts, minAndMaxes, fieldSort.order());
int[] indexTranslation = sortShards(shardsIts, minAndMaxes, request.source());
List<SearchShardIterator> list = new ArrayList<>(indexTranslation.length);
for (int in : indexTranslation) {
list.add(shardsIts.get(in));
}
return list;
}

private static List<SearchShardIterator> sortShards(List<SearchShardIterator> shardsIts, MinAndMax<?>[] minAndMaxes, SortOrder order) {
public static <T extends Comparable<T>> int[] sortShards(List<T> shardsIts, MinAndMax<?>[] minAndMaxes, SearchSourceBuilder source) {
int bound = shardsIts.size();
List<Integer> toSort = new ArrayList<>(bound);
for (int i = 0; i < bound; i++) {
toSort.add(i);
}
Comparator<? super MinAndMax<?>> keyComparator = forciblyCast(MinAndMax.getComparator(order));
Comparator<? super MinAndMax<?>> keyComparator = forciblyCast(
MinAndMax.getComparator(FieldSortBuilder.getPrimaryFieldSortOrNull(source).order())
);
toSort.sort((idx1, idx2) -> {
int res = keyComparator.compare(minAndMaxes[idx1], minAndMaxes[idx2]);
if (res != 0) {
return res;
}
return shardsIts.get(idx1).compareTo(shardsIts.get(idx2));
});
List<SearchShardIterator> list = new ArrayList<>(bound);
for (Integer integer : toSort) {
list.add(shardsIts.get(integer));
int[] result = new int[bound];
for (int i = 0; i < bound; i++) {
result[i] = toSort.get(i);
}
return list;
return result;
}

private static boolean shouldSortShards(MinAndMax<?>[] minAndMaxes) {
Expand Down
Loading