Skip to content

Commit

Permalink
Tests
Browse files Browse the repository at this point in the history
Signed-off-by: David Zane <[email protected]>
  • Loading branch information
dzane17 committed Sep 13, 2023
1 parent a95381a commit 9e3177a
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,11 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct
);
AtomicReference<Exception> exception = new AtomicReference<>();
ActionListener<SearchResponse> listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set);

TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
0,
System.nanoTime(),
System::nanoTime
);
return new SearchDfsQueryThenFetchAsyncAction(
logger,
null,
Expand All @@ -694,7 +698,7 @@ private SearchDfsQueryThenFetchAsyncAction createSearchDfsQueryThenFetchAsyncAct
searchRequest,
listener,
shardsIter,
null,
timeProvider,
null,
task,
SearchResponse.Clusters.EMPTY,
Expand Down Expand Up @@ -726,6 +730,11 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction(
);
AtomicReference<Exception> exception = new AtomicReference<>();
ActionListener<SearchResponse> listener = ActionListener.wrap(response -> fail("onResponse should not be called"), exception::set);
TransportSearchAction.SearchTimeProvider timeProvider = new TransportSearchAction.SearchTimeProvider(
0,
System.nanoTime(),
System::nanoTime
);
return new SearchQueryThenFetchAsyncAction(
logger,
null,
Expand All @@ -739,7 +748,7 @@ private SearchQueryThenFetchAsyncAction createSearchQueryThenFetchAsyncAction(
searchRequest,
listener,
shardsIter,
null,
timeProvider,
null,
task,
SearchResponse.Clusters.EMPTY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ public void setupData() {
3,
0,
100,
SearchResponse.PhaseTook.NULL,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY,
pitId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
numSuccess.get(),
0,
0,
SearchResponse.PhaseTook.NULL,
failures.toArray(ShardSearchFailure.EMPTY_ARRAY),
SearchResponse.Clusters.EMPTY,
searchContextId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ protected MultiSearchResponse createTestInstance() {
successfulShards,
skippedShards,
tookInMillis,
SearchResponse.PhaseTook.NULL,
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down Expand Up @@ -96,6 +97,7 @@ private static MultiSearchResponse createTestInstanceWithFailures() {
successfulShards,
skippedShards,
tookInMillis,
SearchResponse.PhaseTook.NULL,
ShardSearchFailure.EMPTY_ARRAY,
clusters,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,7 @@ public static class TestSearchResponse extends SearchResponse {
final Set<ShardId> queried = new HashSet<>();

TestSearchResponse() {
super(InternalSearchResponse.empty(), null, 0, 0, 0, 0L, ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY, null);
super(InternalSearchResponse.empty(), null, 0, 0, 0, 0L, PhaseTook.NULL, ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ public void testRandomVersionSerialization() throws IOException {
Version version = VersionUtils.randomVersion(random());
SearchRequest deserializedRequest = copyWriteable(searchRequest, namedWriteableRegistry, SearchRequest::new, version);
assertEquals(searchRequest.isCcsMinimizeRoundtrips(), deserializedRequest.isCcsMinimizeRoundtrips());
assertEquals(searchRequest.isPhaseTookQueryParamEnabled(), deserializedRequest.isPhaseTookQueryParamEnabled());
assertEquals(searchRequest.getLocalClusterAlias(), deserializedRequest.getLocalClusterAlias());
assertEquals(searchRequest.getAbsoluteStartMillis(), deserializedRequest.getAbsoluteStartMillis());
assertEquals(searchRequest.isFinalReduce(), deserializedRequest.isFinalReduce());
Expand Down Expand Up @@ -244,6 +245,9 @@ private SearchRequest mutate(SearchRequest searchRequest) {
);
mutators.add(() -> mutation.source(randomValueOtherThan(searchRequest.source(), this::createSearchSourceBuilder)));
mutators.add(() -> mutation.setCcsMinimizeRoundtrips(searchRequest.isCcsMinimizeRoundtrips() == false));
mutators.add(
() -> mutation.setPhaseTookQueryParamEnabled(searchRequest.isPhaseTookQueryParamEnabled() == SearchRequest.ParamValue.FALSE)
);
mutators.add(
() -> mutation.setCancelAfterTimeInterval(
searchRequest.getCancelAfterTimeInterval() != null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ public SearchResponse createTestItem(
Boolean terminatedEarly = randomBoolean() ? null : randomBoolean();
int numReducePhases = randomIntBetween(1, 10);
long tookInMillis = randomNonNegativeLong();
SearchResponse.PhaseTook phaseTook = new SearchResponse.PhaseTook(
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong()
);
int totalShards = randomIntBetween(1, Integer.MAX_VALUE);
int successfulShards = randomIntBetween(0, totalShards);
int skippedShards = randomIntBetween(0, totalShards);
Expand Down Expand Up @@ -182,6 +189,7 @@ public SearchResponse createTestItem(
successfulShards,
skippedShards,
tookInMillis,
phaseTook,
shardSearchFailures,
randomBoolean() ? randomClusters() : SearchResponse.Clusters.EMPTY,
null
Expand Down Expand Up @@ -320,6 +328,7 @@ public void testToXContent() {
0,
0,
0,
SearchResponse.PhaseTook.NULL,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY,
null
Expand Down Expand Up @@ -368,13 +377,23 @@ public void testToXContent() {
0,
0,
0,
new SearchResponse.PhaseTook(0, 0, 50, 25, 0),
ShardSearchFailure.EMPTY_ARRAY,
new SearchResponse.Clusters(5, 3, 2)
new SearchResponse.Clusters(5, 3, 2),
null
);
StringBuilder expectedString = new StringBuilder();
expectedString.append("{");
{
expectedString.append("\"took\":0,");
expectedString.append("\"phase_took\":");
{
expectedString.append("{\"dfs_prequery\":0,");
expectedString.append("\"can_match\":0,");
expectedString.append("\"query\":50,");
expectedString.append("\"fetch\":25,");
expectedString.append("\"expand_search\":0},");
}
expectedString.append("\"timed_out\":false,");
expectedString.append("\"_shards\":");
{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.search;

import org.opensearch.test.OpenSearchTestCase;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Phaser;

public class SearchTimeProviderTests extends OpenSearchTestCase {
public void testSearchRequestPhaseFailure() {
TransportSearchAction.SearchTimeProvider testRequestStats = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
SearchPhaseContext ctx = new MockSearchPhaseContext(1);

testRequestStats.onDFSPreQueryPhaseStart(ctx);
testRequestStats.onDFSPreQueryPhaseFailure(ctx);
assertEquals(0, testRequestStats.getDFSPreQueryTotal());

testRequestStats.onCanMatchPhaseStart(ctx);
testRequestStats.onCanMatchPhaseFailure(ctx);
assertEquals(0, testRequestStats.getCanMatchTotal());

testRequestStats.onQueryPhaseStart(ctx);
testRequestStats.onQueryPhaseFailure(ctx);
assertEquals(0, testRequestStats.getQueryTotal());

testRequestStats.onFetchPhaseStart(ctx);
testRequestStats.onFetchPhaseFailure(ctx);
assertEquals(0, testRequestStats.getFetchTotal());

testRequestStats.onExpandSearchPhaseStart(ctx);
testRequestStats.onExpandSearchPhaseFailure(ctx);
assertEquals(0, testRequestStats.getExpandSearchTotal());
}

public void testSearchTimeProvider() {
TransportSearchAction.SearchTimeProvider testRequestStats = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);

SearchPhaseContext ctx = new MockSearchPhaseContext(1);
long tookTime = randomIntBetween(1, 10);

testRequestStats.onDFSPreQueryPhaseStart(ctx);
testRequestStats.onDFSPreQueryPhaseEnd(ctx, tookTime);
assertEquals(tookTime, testRequestStats.getDFSPreQueryTotal());

testRequestStats.onCanMatchPhaseStart(ctx);
testRequestStats.onCanMatchPhaseEnd(ctx, tookTime);
assertEquals(tookTime, testRequestStats.getCanMatchTotal());

testRequestStats.onQueryPhaseStart(ctx);
testRequestStats.onQueryPhaseEnd(ctx, tookTime);
assertEquals(tookTime, testRequestStats.getQueryTotal());

testRequestStats.onFetchPhaseStart(ctx);
testRequestStats.onFetchPhaseEnd(ctx, tookTime);
testRequestStats.onFetchPhaseEnd(ctx, 10);
assertEquals(tookTime + 10, testRequestStats.getFetchTotal());

testRequestStats.onExpandSearchPhaseStart(ctx);
testRequestStats.onExpandSearchPhaseEnd(ctx, tookTime);
testRequestStats.onExpandSearchPhaseEnd(ctx, tookTime);
assertEquals(2 * tookTime, testRequestStats.getExpandSearchTotal());
}

public void testSearchTimeProviderOnDFSPreQueryEndConcurrently() throws InterruptedException {
TransportSearchAction.SearchTimeProvider testRequestStats = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
SearchPhaseContext ctx = new MockSearchPhaseContext(1);
int numTasks = randomIntBetween(5, 50);
long tookTime = randomIntBetween(1, 10);
Thread[] threads = new Thread[numTasks];
Phaser phaser = new Phaser(numTasks + 1);
CountDownLatch countDownLatch = new CountDownLatch(numTasks);
for (int i = 0; i < numTasks; i++) {
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
testRequestStats.onDFSPreQueryPhaseEnd(ctx, tookTime);
countDownLatch.countDown();
});
threads[i].start();
}
phaser.arriveAndAwaitAdvance();
countDownLatch.await();
assertEquals(tookTime * numTasks, testRequestStats.getDFSPreQueryTotal());
}

public void testSearchTimeProviderOnCanMatchQueryEndConcurrently() throws InterruptedException {
TransportSearchAction.SearchTimeProvider testRequestStats = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
SearchPhaseContext ctx = new MockSearchPhaseContext(1);
int numTasks = randomIntBetween(5, 50);
long tookTime = randomIntBetween(1, 10);
Thread[] threads = new Thread[numTasks];
Phaser phaser = new Phaser(numTasks + 1);
CountDownLatch countDownLatch = new CountDownLatch(numTasks);
for (int i = 0; i < numTasks; i++) {
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
testRequestStats.onCanMatchPhaseEnd(ctx, tookTime);
countDownLatch.countDown();
});
threads[i].start();
}
phaser.arriveAndAwaitAdvance();
countDownLatch.await();
assertEquals(tookTime * numTasks, testRequestStats.getCanMatchTotal());
}

public void testSearchTimeProviderOnQueryEndConcurrently() throws InterruptedException {
TransportSearchAction.SearchTimeProvider testRequestStats = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
SearchPhaseContext ctx = new MockSearchPhaseContext(1);
int numTasks = randomIntBetween(5, 50);
long tookTime = randomIntBetween(1, 10);
Thread[] threads = new Thread[numTasks];
Phaser phaser = new Phaser(numTasks + 1);
CountDownLatch countDownLatch = new CountDownLatch(numTasks);
for (int i = 0; i < numTasks; i++) {
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
testRequestStats.onQueryPhaseEnd(ctx, tookTime);
countDownLatch.countDown();
});
threads[i].start();
}
phaser.arriveAndAwaitAdvance();
countDownLatch.await();
assertEquals(tookTime * numTasks, testRequestStats.getQueryTotal());
}

public void testSearchTimeProviderOnFetchEndConcurrently() throws InterruptedException {
TransportSearchAction.SearchTimeProvider testRequestStats = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
SearchPhaseContext ctx = new MockSearchPhaseContext(1);
int numTasks = randomIntBetween(5, 50);
long tookTime = randomIntBetween(1, 10);
Thread[] threads = new Thread[numTasks];
Phaser phaser = new Phaser(numTasks + 1);
CountDownLatch countDownLatch = new CountDownLatch(numTasks);
for (int i = 0; i < numTasks; i++) {
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
testRequestStats.onFetchPhaseEnd(ctx, tookTime);
countDownLatch.countDown();
});
threads[i].start();
}
phaser.arriveAndAwaitAdvance();
countDownLatch.await();
assertEquals(tookTime * numTasks, testRequestStats.getFetchTotal());
}

public void testSearchTimeProviderOnExpandSearchEndConcurrently() throws InterruptedException {
TransportSearchAction.SearchTimeProvider testRequestStats = new TransportSearchAction.SearchTimeProvider(0, 0, () -> 0);
SearchPhaseContext ctx = new MockSearchPhaseContext(1);
int numTasks = randomIntBetween(5, 50);
long tookTime = randomIntBetween(1, 10);
Thread[] threads = new Thread[numTasks];
Phaser phaser = new Phaser(numTasks + 1);
CountDownLatch countDownLatch = new CountDownLatch(numTasks);
for (int i = 0; i < numTasks; i++) {
threads[i] = new Thread(() -> {
phaser.arriveAndAwaitAdvance();
testRequestStats.onExpandSearchPhaseEnd(ctx, tookTime);
countDownLatch.countDown();
});
threads[i].start();
}
phaser.arriveAndAwaitAdvance();
countDownLatch.await();
assertEquals(tookTime * numTasks, testRequestStats.getExpandSearchTotal());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,18 @@ private static SearchResponse emptySearchResponse() {
null,
1
);
return new SearchResponse(response, null, 1, 1, 0, 100, ShardSearchFailure.EMPTY_ARRAY, SearchResponse.Clusters.EMPTY, null);
return new SearchResponse(
response,
null,
1,
1,
0,
100,
SearchResponse.PhaseTook.NULL,
ShardSearchFailure.EMPTY_ARRAY,
SearchResponse.Clusters.EMPTY,
null
);
}

public void testCCSRemoteReduceMergeFails() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public SearchResponse createTestItem(
successfulShards,
skippedShards,
tookInMillis,
SearchResponse.PhaseTook.NULL,
shardSearchFailures,
randomBoolean() ? randomClusters() : SearchResponse.Clusters.EMPTY,
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ public static SearchRequest randomSearchRequest(Supplier<SearchSourceBuilder> ra
if (randomBoolean()) {
searchRequest.setCancelAfterTimeInterval(TimeValue.parseTimeValue(randomTimeValue(), null, "cancel_after_time_interval"));
}
if (randomBoolean()) {
searchRequest.setPhaseTookQueryParamEnabled(randomBoolean());
}
return searchRequest;
}

Expand Down

0 comments on commit 9e3177a

Please sign in to comment.