From f1bd76a2880c334dca0d259895b5b3d2e9581084 Mon Sep 17 00:00:00 2001 From: Sorabh Hamirwasia Date: Wed, 2 Aug 2023 17:45:56 -0700 Subject: [PATCH] Update search.concurrent.max_slice setting to dynamic cluster setting for main with lucene-9.8 Signed-off-by: Sorabh Hamirwasia --- CHANGELOG.md | 1 + .../common/settings/ClusterSettings.java | 5 +- .../search/DefaultSearchContext.java | 8 ++ .../org/opensearch/search/SearchService.java | 14 +++ .../search/internal/ContextIndexSearcher.java | 31 ++++- .../internal/FilteredSearchContext.java | 5 + .../internal/MaxTargetSliceSupplier.java | 55 +++++++++ .../search/internal/SearchContext.java | 2 + .../common/settings/SettingsModuleTests.java | 32 +++++ .../internal/ContextIndexSearcherTests.java | 113 ++++++++++++++++++ .../search/internal/IndexReaderUtils.java | 51 ++++++++ .../internal/MaxTargetSliceSupplierTests.java | 77 ++++++++++++ .../search/query/QueryPhaseTests.java | 12 ++ .../test/OpenSearchIntegTestCase.java | 8 +- .../test/OpenSearchSingleNodeTestCase.java | 14 ++- .../opensearch/test/TestSearchContext.java | 18 +++ 16 files changed, 439 insertions(+), 7 deletions(-) create mode 100644 server/src/main/java/org/opensearch/search/internal/MaxTargetSliceSupplier.java create mode 100644 server/src/test/java/org/opensearch/search/internal/IndexReaderUtils.java create mode 100644 server/src/test/java/org/opensearch/search/internal/MaxTargetSliceSupplierTests.java diff --git a/CHANGELOG.md b/CHANGELOG.md index e29bbd2da4db5..fcbb1e39ca053 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Add getter for path field in NestedQueryBuilder ([#4636](https://github.com/opensearch-project/OpenSearch/pull/4636)) - Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151)) - Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854)) +- Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 802eb7bd01254..f540f332f945b 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -677,7 +677,10 @@ public void apply(Settings value, Settings current, Settings previous) { IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING ), List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH), - List.of(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING), + List.of( + SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, + SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING + ), List.of(FeatureFlags.TELEMETRY), List.of(TelemetrySettings.TRACER_ENABLED_SETTING) ); diff --git a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java index d10173184f1c6..e0603cbbf0696 100644 --- a/server/src/main/java/org/opensearch/search/DefaultSearchContext.java +++ b/server/src/main/java/org/opensearch/search/DefaultSearchContext.java @@ -943,4 +943,12 @@ private boolean useConcurrentSearch(Executor concurrentSearchExecutor) { return false; } } + + @Override + public int getTargetMaxSliceCount() { + if (!isConcurrentSegmentSearchEnabled()) { + throw new IllegalStateException("Target slice count should not be used when concurrent search is disabled"); + } + return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING); + } } diff --git a/server/src/main/java/org/opensearch/search/SearchService.java b/server/src/main/java/org/opensearch/search/SearchService.java index b244290e8ae74..ede8b3dfb7495 100644 --- a/server/src/main/java/org/opensearch/search/SearchService.java +++ b/server/src/main/java/org/opensearch/search/SearchService.java @@ -254,6 +254,20 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv Property.NodeScope ); + // settings to configure maximum slice created per search request using OS custom slice computation mechanism. Default lucene + // mechanism will not be used if this setting is set with value > 0 + public static final String CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY = "search.concurrent.max_slice"; + public static final int CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE = 0; + + // value == 0 means lucene slice computation will be used + public static final Setting CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING = Setting.intSetting( + CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY, + CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE, + CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE, + Property.Dynamic, + Property.NodeScope + ); + public static final int DEFAULT_SIZE = 10; public static final int DEFAULT_FROM = 0; diff --git a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java index 5384b47cc69ec..f509ae2ef15fe 100644 --- a/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java +++ b/server/src/main/java/org/opensearch/search/internal/ContextIndexSearcher.java @@ -32,6 +32,8 @@ package org.opensearch.search.internal; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import org.apache.lucene.index.DirectoryReader; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReaderContext; @@ -93,11 +95,13 @@ * @opensearch.internal */ public class ContextIndexSearcher extends IndexSearcher implements Releasable { + + private static final Logger logger = LogManager.getLogger(ContextIndexSearcher.class); /** * The interval at which we check for search cancellation when we cannot use * a {@link CancellableBulkScorer}. See {@link #intersectScorerAndBitSet}. */ - private static int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11; + private static final int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11; private AggregatedDfs aggregatedDfs; private QueryProfiler profiler; @@ -443,6 +447,16 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio return collectionStatistics; } + /** + * Compute the leaf slices that will be used by concurrent segment search to spread work across threads + * @param leaves all the segments + * @return leafSlice group to be executed by different threads + */ + @Override + public LeafSlice[] slices(List leaves) { + return slicesInternal(leaves, searchContext.getTargetMaxSliceCount()); + } + public DirectoryReader getDirectoryReader() { final IndexReader reader = getIndexReader(); assert reader instanceof DirectoryReader : "expected an instance of DirectoryReader, got " + reader.getClass(); @@ -522,4 +536,19 @@ private boolean shouldReverseLeafReaderContexts() { } return false; } + + // package-private for testing + LeafSlice[] slicesInternal(List leaves, int targetMaxSlice) { + LeafSlice[] leafSlices; + if (targetMaxSlice == 0) { + // use the default lucene slice calculation + leafSlices = super.slices(leaves); + logger.debug("Slice count using lucene default [{}]", leafSlices.length); + } else { + // use the custom slice calculation based on targetMaxSlice + leafSlices = MaxTargetSliceSupplier.getSlices(leaves, targetMaxSlice); + logger.debug("Slice count using max target slice supplier [{}]", leafSlices.length); + } + return leafSlices; + } } diff --git a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java index 02e6568369e16..5cd25d3b71704 100644 --- a/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/FilteredSearchContext.java @@ -564,4 +564,9 @@ public BucketCollectorProcessor bucketCollectorProcessor() { public boolean isConcurrentSegmentSearchEnabled() { return in.isConcurrentSegmentSearchEnabled(); } + + @Override + public int getTargetMaxSliceCount() { + return in.getTargetMaxSliceCount(); + } } diff --git a/server/src/main/java/org/opensearch/search/internal/MaxTargetSliceSupplier.java b/server/src/main/java/org/opensearch/search/internal/MaxTargetSliceSupplier.java new file mode 100644 index 0000000000000..4b20ae6e771ea --- /dev/null +++ b/server/src/main/java/org/opensearch/search/internal/MaxTargetSliceSupplier.java @@ -0,0 +1,55 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.internal; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.IndexSearcher; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + * Supplier to compute leaf slices based on passed in leaves and max target slice count to limit the number of computed slices. It sorts + * all the leaves based on document count and then assign each leaf in round-robin fashion to the target slice count slices. Based on + * experiment results as shared in issue-7358 + * we can see this mechanism helps to achieve better tail/median latency over default lucene slice computation. + * + * @opensearch.internal + */ +final class MaxTargetSliceSupplier { + + static IndexSearcher.LeafSlice[] getSlices(List leaves, int targetMaxSlice) { + if (targetMaxSlice <= 0) { + throw new IllegalArgumentException("MaxTargetSliceSupplier called with unexpected slice count of " + targetMaxSlice); + } + + // slice count should not exceed the segment count + int targetSliceCount = Math.min(targetMaxSlice, leaves.size()); + + // Make a copy so we can sort: + List sortedLeaves = new ArrayList<>(leaves); + + // Sort by maxDoc, descending: + sortedLeaves.sort(Collections.reverseOrder(Comparator.comparingInt(l -> l.reader().maxDoc()))); + + final List> groupedLeaves = new ArrayList<>(); + for (int i = 0; i < targetSliceCount; ++i) { + groupedLeaves.add(new ArrayList<>()); + } + // distribute the slices in round-robin fashion + for (int idx = 0; idx < sortedLeaves.size(); ++idx) { + int currentGroup = idx % targetSliceCount; + groupedLeaves.get(currentGroup).add(sortedLeaves.get(idx)); + } + + return groupedLeaves.stream().map(IndexSearcher.LeafSlice::new).toArray(IndexSearcher.LeafSlice[]::new); + } +} diff --git a/server/src/main/java/org/opensearch/search/internal/SearchContext.java b/server/src/main/java/org/opensearch/search/internal/SearchContext.java index c2f81b0d4b8b5..bc2a0658e5a6d 100644 --- a/server/src/main/java/org/opensearch/search/internal/SearchContext.java +++ b/server/src/main/java/org/opensearch/search/internal/SearchContext.java @@ -471,4 +471,6 @@ public String toString() { public abstract void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor); public abstract BucketCollectorProcessor bucketCollectorProcessor(); + + public abstract int getTargetMaxSliceCount(); } diff --git a/server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java b/server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java index 4490f6b39996f..038347923d459 100644 --- a/server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java +++ b/server/src/test/java/org/opensearch/common/settings/SettingsModuleTests.java @@ -335,4 +335,36 @@ public void testConcurrentSegmentSearchIndexSettings() { "node" ); } + + public void testMaxSliceCountClusterSettingsForConcurrentSearch() { + // Test that we throw an exception without the feature flag + Settings settings = Settings.builder() + .put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 2) + .build(); + SettingsException ex = expectThrows(SettingsException.class, () -> new SettingsModule(settings)); + assertTrue( + ex.getMessage() + .contains("unknown setting [" + SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey() + "]") + ); + + // Test that the settings updates correctly with the feature flag + FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH); + int settingValue = randomIntBetween(0, 10); + Settings settingsWithFeatureFlag = Settings.builder() + .put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), settingValue) + .build(); + SettingsModule settingsModule = new SettingsModule(settingsWithFeatureFlag); + assertEquals( + settingValue, + (int) SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.get(settingsModule.getSettings()) + ); + + // Test that negative value is not allowed + settingValue = -1; + final Settings settingsWithFeatureFlag_2 = Settings.builder() + .put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), settingValue) + .build(); + IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> new SettingsModule(settingsWithFeatureFlag_2)); + assertTrue(iae.getMessage().contains(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey())); + } } diff --git a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java index 823fc6b463906..05c9e881d8e32 100644 --- a/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java +++ b/server/src/test/java/org/opensearch/search/internal/ContextIndexSearcherTests.java @@ -81,6 +81,7 @@ import org.opensearch.index.cache.bitset.BitsetFilterCache; import org.opensearch.core.index.shard.ShardId; import org.opensearch.index.shard.IndexShard; +import org.opensearch.search.SearchService; import org.opensearch.search.aggregations.LeafBucketCollector; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.IndexSettingsModule; @@ -89,7 +90,9 @@ import java.io.UncheckedIOException; import java.util.Collections; import java.util.IdentityHashMap; +import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutorService; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -99,6 +102,7 @@ import static org.opensearch.search.internal.ExitableDirectoryReader.ExitableTerms; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.instanceOf; +import static org.opensearch.search.internal.IndexReaderUtils.getLeaves; public class ContextIndexSearcherTests extends OpenSearchTestCase { public void testIntersectScorerAndRoleBits() throws Exception { @@ -303,6 +307,115 @@ public void onRemoval(ShardId shardId, Accountable accountable) { IOUtils.close(reader, w, dir); } + public void testSlicesInternal() throws Exception { + final List leaves = getLeaves(10); + + final Directory directory = newDirectory(); + IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE)); + Document document = new Document(); + document.add(new StringField("field1", "value1", Field.Store.NO)); + document.add(new StringField("field2", "value1", Field.Store.NO)); + iw.addDocument(document); + iw.commit(); + DirectoryReader directoryReader = DirectoryReader.open(directory); + + SearchContext searchContext = mock(SearchContext.class); + IndexShard indexShard = mock(IndexShard.class); + when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); + ContextIndexSearcher searcher = new ContextIndexSearcher( + directoryReader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + true, + null, + searchContext + ); + // Case 1: Verify the slice count when lucene default slice computation is used + IndexSearcher.LeafSlice[] slices = searcher.slicesInternal( + leaves, + SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE + ); + int expectedSliceCount = 2; + // 2 slices will be created since max segment per slice of 5 will be reached + assertEquals(expectedSliceCount, slices.length); + for (int i = 0; i < expectedSliceCount; ++i) { + assertEquals(5, slices[i].leaves.length); + } + + // Case 2: Verify the slice count when custom max slice computation is used + expectedSliceCount = 4; + slices = searcher.slicesInternal(leaves, expectedSliceCount); + + // 4 slices will be created with 3 leaves in first 2 slices and 2 leaves in other slices + assertEquals(expectedSliceCount, slices.length); + for (int i = 0; i < expectedSliceCount; ++i) { + if (i < 2) { + assertEquals(3, slices[i].leaves.length); + } else { + assertEquals(2, slices[i].leaves.length); + } + } + IOUtils.close(directoryReader, iw, directory); + } + + public void testGetSlicesWithNonNullExecutorButCSDisabled() throws Exception { + final List leaves = getLeaves(10); + + final Directory directory = newDirectory(); + IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE)); + Document document = new Document(); + document.add(new StringField("field1", "value1", Field.Store.NO)); + document.add(new StringField("field2", "value1", Field.Store.NO)); + iw.addDocument(document); + iw.commit(); + DirectoryReader directoryReader = DirectoryReader.open(directory); + + SearchContext searchContext = mock(SearchContext.class); + IndexShard indexShard = mock(IndexShard.class); + when(searchContext.indexShard()).thenReturn(indexShard); + when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); + when(searchContext.isConcurrentSegmentSearchEnabled()).thenReturn(false); + ContextIndexSearcher searcher = new ContextIndexSearcher( + directoryReader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + true, + null, + searchContext + ); + // Case 1: Verify getSlices return null when concurrent segment search is disabled + assertNull(searcher.getSlices()); + + // Case 2: Verify the slice count when custom max slice computation is used + searcher = new ContextIndexSearcher( + directoryReader, + IndexSearcher.getDefaultSimilarity(), + IndexSearcher.getDefaultQueryCache(), + IndexSearcher.getDefaultQueryCachingPolicy(), + true, + mock(ExecutorService.class), + searchContext + ); + when(searchContext.isConcurrentSegmentSearchEnabled()).thenReturn(true); + when(searchContext.getTargetMaxSliceCount()).thenReturn(4); + int expectedSliceCount = 4; + IndexSearcher.LeafSlice[] slices = searcher.slices(leaves); + + // 4 slices will be created with 3 leaves in first 2 slices and 2 leaves in other slices + assertEquals(expectedSliceCount, slices.length); + for (int i = 0; i < expectedSliceCount; ++i) { + if (i < 2) { + assertEquals(3, slices[i].leaves.length); + } else { + assertEquals(2, slices[i].leaves.length); + } + } + IOUtils.close(directoryReader, iw, directory); + } + private SparseFixedBitSet query(LeafReaderContext leaf, String field, String value) throws IOException { SparseFixedBitSet sparseFixedBitSet = new SparseFixedBitSet(leaf.reader().maxDoc()); TermsEnum tenum = leaf.reader().terms(field).iterator(); diff --git a/server/src/test/java/org/opensearch/search/internal/IndexReaderUtils.java b/server/src/test/java/org/opensearch/search/internal/IndexReaderUtils.java new file mode 100644 index 0000000000000..a87bb8a52cdd0 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/internal/IndexReaderUtils.java @@ -0,0 +1,51 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.internal; + +import org.apache.lucene.analysis.standard.StandardAnalyzer; +import org.apache.lucene.document.Document; +import org.apache.lucene.document.Field; +import org.apache.lucene.document.StringField; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexWriter; +import org.apache.lucene.index.IndexWriterConfig; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NoMergePolicy; +import org.apache.lucene.store.Directory; + +import java.util.List; + +import static org.apache.lucene.tests.util.LuceneTestCase.newDirectory; + +public class IndexReaderUtils { + + /** + * Utility to create leafCount number of {@link LeafReaderContext} + * @param leafCount count of leaves to create + * @return created leaves + */ + public static List getLeaves(int leafCount) throws Exception { + final Directory directory = newDirectory(); + IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE)); + for (int i = 0; i < leafCount; ++i) { + Document document = new Document(); + final String fieldValue = "value" + i; + document.add(new StringField("field1", fieldValue, Field.Store.NO)); + document.add(new StringField("field2", fieldValue, Field.Store.NO)); + iw.addDocument(document); + iw.commit(); + } + iw.close(); + DirectoryReader directoryReader = DirectoryReader.open(directory); + List leaves = directoryReader.leaves(); + directoryReader.close(); + directory.close(); + return leaves; + } +} diff --git a/server/src/test/java/org/opensearch/search/internal/MaxTargetSliceSupplierTests.java b/server/src/test/java/org/opensearch/search/internal/MaxTargetSliceSupplierTests.java new file mode 100644 index 0000000000000..2684cf901f080 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/internal/MaxTargetSliceSupplierTests.java @@ -0,0 +1,77 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.internal; + +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.search.IndexSearcher; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.List; + +import static org.opensearch.search.internal.IndexReaderUtils.getLeaves; + +public class MaxTargetSliceSupplierTests extends OpenSearchTestCase { + + public void testSliceCountGreaterThanLeafCount() throws Exception { + int expectedSliceCount = 2; + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices(getLeaves(expectedSliceCount), 5); + // verify slice count is same as leaf count + assertEquals(expectedSliceCount, slices.length); + for (int i = 0; i < expectedSliceCount; ++i) { + assertEquals(1, slices[i].leaves.length); + } + } + + public void testNegativeSliceCount() { + assertThrows(IllegalArgumentException.class, () -> MaxTargetSliceSupplier.getSlices(new ArrayList<>(), randomIntBetween(-3, 0))); + } + + public void testSingleSliceWithMultipleLeaves() throws Exception { + int leafCount = randomIntBetween(1, 10); + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices(getLeaves(leafCount), 1); + assertEquals(1, slices.length); + assertEquals(leafCount, slices[0].leaves.length); + } + + public void testSliceCountLessThanLeafCount() throws Exception { + int leafCount = 12; + List leaves = getLeaves(leafCount); + + // Case 1: test with equal number of leaves per slice + int expectedSliceCount = 3; + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices(leaves, expectedSliceCount); + int expectedLeavesPerSlice = leafCount / expectedSliceCount; + + assertEquals(expectedSliceCount, slices.length); + for (int i = 0; i < expectedSliceCount; ++i) { + assertEquals(expectedLeavesPerSlice, slices[i].leaves.length); + } + + // Case 2: test with first 2 slice more leaves than others + expectedSliceCount = 5; + slices = MaxTargetSliceSupplier.getSlices(leaves, expectedSliceCount); + int expectedLeavesInFirst2Slice = 3; + int expectedLeavesInOtherSlice = 2; + + assertEquals(expectedSliceCount, slices.length); + for (int i = 0; i < expectedSliceCount; ++i) { + if (i < 2) { + assertEquals(expectedLeavesInFirst2Slice, slices[i].leaves.length); + } else { + assertEquals(expectedLeavesInOtherSlice, slices[i].leaves.length); + } + } + } + + public void testEmptyLeaves() { + IndexSearcher.LeafSlice[] slices = MaxTargetSliceSupplier.getSlices(new ArrayList<>(), 2); + assertEquals(0, slices.length); + } +} diff --git a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java index 61b78905334ec..a2303e6f76c41 100644 --- a/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java +++ b/server/src/test/java/org/opensearch/search/query/QueryPhaseTests.java @@ -1208,6 +1208,12 @@ private static ContextIndexSearcher newContextSearcher(IndexReader reader, Execu IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); + when(searchContext.isConcurrentSegmentSearchEnabled()).thenReturn(executor != null); + if (executor != null) { + when(searchContext.getTargetMaxSliceCount()).thenReturn(randomIntBetween(0, 2)); + } else { + when(searchContext.getTargetMaxSliceCount()).thenThrow(IllegalStateException.class); + } return new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), @@ -1225,6 +1231,12 @@ private static ContextIndexSearcher newEarlyTerminationContextSearcher(IndexRead IndexShard indexShard = mock(IndexShard.class); when(searchContext.indexShard()).thenReturn(indexShard); when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR); + when(searchContext.isConcurrentSegmentSearchEnabled()).thenReturn(executor != null); + if (executor != null) { + when(searchContext.getTargetMaxSliceCount()).thenReturn(randomIntBetween(0, 2)); + } else { + when(searchContext.getTargetMaxSliceCount()).thenThrow(IllegalStateException.class); + } return new ContextIndexSearcher( reader, IndexSearcher.getDefaultSimilarity(), diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 422f6d8dfbe7d..e4e64bed4140e 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -1891,6 +1891,7 @@ private int getNumClientNodes() { * In other words subclasses must ensure this method is idempotent. */ protected Settings nodeSettings(int nodeOrdinal) { + final Settings featureFlagSettings = featureFlagSettings(); Settings.Builder builder = Settings.builder() // Default the watermarks to absurdly low to prevent the tests // from failing on nodes without enough disk space @@ -1907,7 +1908,12 @@ protected Settings nodeSettings(int nodeOrdinal) { .put(SearchService.LOW_LEVEL_CANCELLATION_SETTING.getKey(), randomBoolean()) .putList(DISCOVERY_SEED_HOSTS_SETTING.getKey()) // empty list disables a port scan for other nodes .putList(DISCOVERY_SEED_PROVIDERS_SETTING.getKey(), "file") - .put(featureFlagSettings()); + .put(featureFlagSettings); + if (FeatureFlags.CONCURRENT_SEGMENT_SEARCH_SETTING.get(featureFlagSettings)) { + // By default, for tests we will put the target slice count of 2. This will increase the probability of having multiple slices + // when tests are run with concurrent segment search enabled + builder.put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY, 2); + } return builder.build(); } diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java index 63b486e32ff5b..59ef50feea62a 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchSingleNodeTestCase.java @@ -68,6 +68,7 @@ import org.opensearch.node.NodeValidationException; import org.opensearch.plugins.Plugin; import org.opensearch.script.MockScriptService; +import org.opensearch.search.SearchService; import org.opensearch.search.internal.SearchContext; import org.opensearch.telemetry.TelemetrySettings; import org.opensearch.test.telemetry.MockTelemetryPlugin; @@ -225,7 +226,8 @@ private Node newNode() { final Path tempDir = createTempDir(); final String nodeName = nodeSettings().get(Node.NODE_NAME_SETTING.getKey(), "node_s_0"); - Settings settings = Settings.builder() + final Settings featureFlagSettings = featureFlagSettings(); + Settings.Builder settingsBuilder = Settings.builder() .put(ClusterName.CLUSTER_NAME_SETTING.getKey(), InternalTestCluster.clusterName("single-node-cluster", random().nextLong())) .put(Environment.PATH_HOME_SETTING.getKey(), tempDir) .put(Environment.PATH_REPO_SETTING.getKey(), tempDir.resolve("repo")) @@ -250,8 +252,12 @@ private Node newNode() { .put(FeatureFlags.TELEMETRY_SETTING.getKey(), true) .put(TelemetrySettings.TRACER_ENABLED_SETTING.getKey(), true) .put(nodeSettings()) // allow test cases to provide their own settings or override these - .put(featureFlagSettings()) - .build(); + .put(featureFlagSettings); + if (FeatureFlags.CONCURRENT_SEGMENT_SEARCH_SETTING.get(featureFlagSettings)) { + // By default, for tests we will put the target slice count of 2. This will increase the probability of having multiple slices + // when tests are run with concurrent segment search enabled + settingsBuilder.put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY, 2); + } Collection> plugins = getPlugins(); if (plugins.contains(getTestTransportPlugin()) == false) { @@ -263,7 +269,7 @@ private Node newNode() { } plugins.add(MockScriptService.TestPlugin.class); plugins.add(MockTelemetryPlugin.class); - Node node = new MockNode(settings, plugins, forbidPrivateIndexSettings()); + Node node = new MockNode(settingsBuilder.build(), plugins, forbidPrivateIndexSettings()); try { node.start(); } catch (NodeValidationException e) { diff --git a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java index 0ce63fbe2977e..926387c0d148c 100644 --- a/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java +++ b/test/framework/src/main/java/org/opensearch/test/TestSearchContext.java @@ -83,6 +83,8 @@ import java.util.List; import java.util.Map; +import static org.opensearch.test.OpenSearchTestCase.randomIntBetween; + public class TestSearchContext extends SearchContext { public static final SearchShardTarget SHARD_TARGET = new SearchShardTarget( "test", @@ -118,6 +120,7 @@ public class TestSearchContext extends SearchContext { private CollapseContext collapse; protected boolean concurrentSegmentSearchEnabled; private BucketCollectorProcessor bucketCollectorProcessor = NO_OP_BUCKET_COLLECTOR_PROCESSOR; + private int maxSliceCount; /** * Sets the concurrent segment search enabled field @@ -126,6 +129,14 @@ public void setConcurrentSegmentSearchEnabled(boolean concurrentSegmentSearchEna this.concurrentSegmentSearchEnabled = concurrentSegmentSearchEnabled; } + /** + * Sets the maxSliceCount for concurrent search + * @param sliceCount maxSliceCount + */ + public void setMaxSliceCount(int sliceCount) { + this.maxSliceCount = sliceCount; + } + private final Map searchExtBuilders = new HashMap<>(); public TestSearchContext(BigArrays bigArrays, IndexService indexService) { @@ -161,6 +172,7 @@ public TestSearchContext( this.queryShardContext = queryShardContext; this.searcher = searcher; this.concurrentSegmentSearchEnabled = searcher != null && (searcher.getExecutor() != null); + this.maxSliceCount = randomIntBetween(0, 2); this.scrollContext = scrollContext; } @@ -674,6 +686,12 @@ public BucketCollectorProcessor bucketCollectorProcessor() { return bucketCollectorProcessor; } + @Override + public int getTargetMaxSliceCount() { + assert concurrentSegmentSearchEnabled == true : "Please use concurrent search before fetching maxSliceCount"; + return maxSliceCount; + } + /** * Clean the query results by consuming all of it */