Skip to content

Commit

Permalink
Update search.concurrent.max_slice setting to dynamic cluster setting…
Browse files Browse the repository at this point in the history
… for main with lucene-9.8

Signed-off-by: Sorabh Hamirwasia <[email protected]>
  • Loading branch information
sohami committed Aug 4, 2023
1 parent 9357b61 commit f1bd76a
Show file tree
Hide file tree
Showing 16 changed files with 439 additions and 7 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add getter for path field in NestedQueryBuilder ([#4636](https://github.com/opensearch-project/OpenSearch/pull/4636))
- Allow mmap to use new JDK-19 preview APIs in Apache Lucene 9.4+ ([#5151](https://github.com/opensearch-project/OpenSearch/pull/5151))
- Add events correlation engine plugin ([#6854](https://github.com/opensearch-project/OpenSearch/issues/6854))
- Introduce new dynamic cluster setting to control slice computation for concurrent segment search ([#9107](https://github.com/opensearch-project/OpenSearch/pull/9107))

### Dependencies
- Bump `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -677,7 +677,10 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING
),
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
List.of(SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING),
List.of(
SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING,
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING
),
List.of(FeatureFlags.TELEMETRY),
List.of(TelemetrySettings.TRACER_ENABLED_SETTING)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -943,4 +943,12 @@ private boolean useConcurrentSearch(Executor concurrentSearchExecutor) {
return false;
}
}

@Override
public int getTargetMaxSliceCount() {
if (!isConcurrentSegmentSearchEnabled()) {
throw new IllegalStateException("Target slice count should not be used when concurrent search is disabled");
}
return clusterService.getClusterSettings().get(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING);
}
}
14 changes: 14 additions & 0 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,20 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Property.NodeScope
);

// settings to configure maximum slice created per search request using OS custom slice computation mechanism. Default lucene
// mechanism will not be used if this setting is set with value > 0
public static final String CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY = "search.concurrent.max_slice";
public static final int CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE = 0;

// value == 0 means lucene slice computation will be used
public static final Setting<Integer> CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING = Setting.intSetting(
CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY,
CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE,
CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE,
Property.Dynamic,
Property.NodeScope
);

public static final int DEFAULT_SIZE = 10;
public static final int DEFAULT_FROM = 0;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@

package org.opensearch.search.internal;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
Expand Down Expand Up @@ -93,11 +95,13 @@
* @opensearch.internal
*/
public class ContextIndexSearcher extends IndexSearcher implements Releasable {

private static final Logger logger = LogManager.getLogger(ContextIndexSearcher.class);
/**
* The interval at which we check for search cancellation when we cannot use
* a {@link CancellableBulkScorer}. See {@link #intersectScorerAndBitSet}.
*/
private static int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11;
private static final int CHECK_CANCELLED_SCORER_INTERVAL = 1 << 11;

private AggregatedDfs aggregatedDfs;
private QueryProfiler profiler;
Expand Down Expand Up @@ -443,6 +447,16 @@ public CollectionStatistics collectionStatistics(String field) throws IOExceptio
return collectionStatistics;
}

/**
* Compute the leaf slices that will be used by concurrent segment search to spread work across threads
* @param leaves all the segments
* @return leafSlice group to be executed by different threads
*/
@Override
public LeafSlice[] slices(List<LeafReaderContext> leaves) {
return slicesInternal(leaves, searchContext.getTargetMaxSliceCount());
}

public DirectoryReader getDirectoryReader() {
final IndexReader reader = getIndexReader();
assert reader instanceof DirectoryReader : "expected an instance of DirectoryReader, got " + reader.getClass();
Expand Down Expand Up @@ -522,4 +536,19 @@ private boolean shouldReverseLeafReaderContexts() {
}
return false;
}

// package-private for testing
LeafSlice[] slicesInternal(List<LeafReaderContext> leaves, int targetMaxSlice) {
LeafSlice[] leafSlices;
if (targetMaxSlice == 0) {
// use the default lucene slice calculation
leafSlices = super.slices(leaves);
logger.debug("Slice count using lucene default [{}]", leafSlices.length);
} else {
// use the custom slice calculation based on targetMaxSlice
leafSlices = MaxTargetSliceSupplier.getSlices(leaves, targetMaxSlice);
logger.debug("Slice count using max target slice supplier [{}]", leafSlices.length);
}
return leafSlices;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -564,4 +564,9 @@ public BucketCollectorProcessor bucketCollectorProcessor() {
public boolean isConcurrentSegmentSearchEnabled() {
return in.isConcurrentSegmentSearchEnabled();
}

@Override
public int getTargetMaxSliceCount() {
return in.getTargetMaxSliceCount();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.internal;

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.IndexSearcher;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;

/**
* Supplier to compute leaf slices based on passed in leaves and max target slice count to limit the number of computed slices. It sorts
* all the leaves based on document count and then assign each leaf in round-robin fashion to the target slice count slices. Based on
* experiment results as shared in <a href=https://github.com/opensearch-project/OpenSearch/issues/7358>issue-7358</a>
* we can see this mechanism helps to achieve better tail/median latency over default lucene slice computation.
*
* @opensearch.internal
*/
final class MaxTargetSliceSupplier {

static IndexSearcher.LeafSlice[] getSlices(List<LeafReaderContext> leaves, int targetMaxSlice) {
if (targetMaxSlice <= 0) {
throw new IllegalArgumentException("MaxTargetSliceSupplier called with unexpected slice count of " + targetMaxSlice);
}

// slice count should not exceed the segment count
int targetSliceCount = Math.min(targetMaxSlice, leaves.size());

// Make a copy so we can sort:
List<LeafReaderContext> sortedLeaves = new ArrayList<>(leaves);

// Sort by maxDoc, descending:
sortedLeaves.sort(Collections.reverseOrder(Comparator.comparingInt(l -> l.reader().maxDoc())));

final List<List<LeafReaderContext>> groupedLeaves = new ArrayList<>();
for (int i = 0; i < targetSliceCount; ++i) {
groupedLeaves.add(new ArrayList<>());
}
// distribute the slices in round-robin fashion
for (int idx = 0; idx < sortedLeaves.size(); ++idx) {
int currentGroup = idx % targetSliceCount;
groupedLeaves.get(currentGroup).add(sortedLeaves.get(idx));
}

return groupedLeaves.stream().map(IndexSearcher.LeafSlice::new).toArray(IndexSearcher.LeafSlice[]::new);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -471,4 +471,6 @@ public String toString() {
public abstract void setBucketCollectorProcessor(BucketCollectorProcessor bucketCollectorProcessor);

public abstract BucketCollectorProcessor bucketCollectorProcessor();

public abstract int getTargetMaxSliceCount();
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,4 +335,36 @@ public void testConcurrentSegmentSearchIndexSettings() {
"node"
);
}

public void testMaxSliceCountClusterSettingsForConcurrentSearch() {
// Test that we throw an exception without the feature flag
Settings settings = Settings.builder()
.put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), 2)
.build();
SettingsException ex = expectThrows(SettingsException.class, () -> new SettingsModule(settings));
assertTrue(
ex.getMessage()
.contains("unknown setting [" + SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey() + "]")
);

// Test that the settings updates correctly with the feature flag
FeatureFlagSetter.set(FeatureFlags.CONCURRENT_SEGMENT_SEARCH);
int settingValue = randomIntBetween(0, 10);
Settings settingsWithFeatureFlag = Settings.builder()
.put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), settingValue)
.build();
SettingsModule settingsModule = new SettingsModule(settingsWithFeatureFlag);
assertEquals(
settingValue,
(int) SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.get(settingsModule.getSettings())
);

// Test that negative value is not allowed
settingValue = -1;
final Settings settingsWithFeatureFlag_2 = Settings.builder()
.put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey(), settingValue)
.build();
IllegalArgumentException iae = expectThrows(IllegalArgumentException.class, () -> new SettingsModule(settingsWithFeatureFlag_2));
assertTrue(iae.getMessage().contains(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING.getKey()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
import org.opensearch.index.cache.bitset.BitsetFilterCache;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.LeafBucketCollector;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.IndexSettingsModule;
Expand All @@ -89,7 +90,9 @@
import java.io.UncheckedIOException;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand All @@ -99,6 +102,7 @@
import static org.opensearch.search.internal.ExitableDirectoryReader.ExitableTerms;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.instanceOf;
import static org.opensearch.search.internal.IndexReaderUtils.getLeaves;

public class ContextIndexSearcherTests extends OpenSearchTestCase {
public void testIntersectScorerAndRoleBits() throws Exception {
Expand Down Expand Up @@ -303,6 +307,115 @@ public void onRemoval(ShardId shardId, Accountable accountable) {
IOUtils.close(reader, w, dir);
}

public void testSlicesInternal() throws Exception {
final List<LeafReaderContext> leaves = getLeaves(10);

final Directory directory = newDirectory();
IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE));
Document document = new Document();
document.add(new StringField("field1", "value1", Field.Store.NO));
document.add(new StringField("field2", "value1", Field.Store.NO));
iw.addDocument(document);
iw.commit();
DirectoryReader directoryReader = DirectoryReader.open(directory);

SearchContext searchContext = mock(SearchContext.class);
IndexShard indexShard = mock(IndexShard.class);
when(searchContext.indexShard()).thenReturn(indexShard);
when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR);
ContextIndexSearcher searcher = new ContextIndexSearcher(
directoryReader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null,
searchContext
);
// Case 1: Verify the slice count when lucene default slice computation is used
IndexSearcher.LeafSlice[] slices = searcher.slicesInternal(
leaves,
SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_DEFAULT_VALUE
);
int expectedSliceCount = 2;
// 2 slices will be created since max segment per slice of 5 will be reached
assertEquals(expectedSliceCount, slices.length);
for (int i = 0; i < expectedSliceCount; ++i) {
assertEquals(5, slices[i].leaves.length);
}

// Case 2: Verify the slice count when custom max slice computation is used
expectedSliceCount = 4;
slices = searcher.slicesInternal(leaves, expectedSliceCount);

// 4 slices will be created with 3 leaves in first 2 slices and 2 leaves in other slices
assertEquals(expectedSliceCount, slices.length);
for (int i = 0; i < expectedSliceCount; ++i) {
if (i < 2) {
assertEquals(3, slices[i].leaves.length);
} else {
assertEquals(2, slices[i].leaves.length);
}
}
IOUtils.close(directoryReader, iw, directory);
}

public void testGetSlicesWithNonNullExecutorButCSDisabled() throws Exception {
final List<LeafReaderContext> leaves = getLeaves(10);

final Directory directory = newDirectory();
IndexWriter iw = new IndexWriter(directory, new IndexWriterConfig(new StandardAnalyzer()).setMergePolicy(NoMergePolicy.INSTANCE));
Document document = new Document();
document.add(new StringField("field1", "value1", Field.Store.NO));
document.add(new StringField("field2", "value1", Field.Store.NO));
iw.addDocument(document);
iw.commit();
DirectoryReader directoryReader = DirectoryReader.open(directory);

SearchContext searchContext = mock(SearchContext.class);
IndexShard indexShard = mock(IndexShard.class);
when(searchContext.indexShard()).thenReturn(indexShard);
when(searchContext.bucketCollectorProcessor()).thenReturn(SearchContext.NO_OP_BUCKET_COLLECTOR_PROCESSOR);
when(searchContext.isConcurrentSegmentSearchEnabled()).thenReturn(false);
ContextIndexSearcher searcher = new ContextIndexSearcher(
directoryReader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
null,
searchContext
);
// Case 1: Verify getSlices return null when concurrent segment search is disabled
assertNull(searcher.getSlices());

// Case 2: Verify the slice count when custom max slice computation is used
searcher = new ContextIndexSearcher(
directoryReader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
true,
mock(ExecutorService.class),
searchContext
);
when(searchContext.isConcurrentSegmentSearchEnabled()).thenReturn(true);
when(searchContext.getTargetMaxSliceCount()).thenReturn(4);
int expectedSliceCount = 4;
IndexSearcher.LeafSlice[] slices = searcher.slices(leaves);

// 4 slices will be created with 3 leaves in first 2 slices and 2 leaves in other slices
assertEquals(expectedSliceCount, slices.length);
for (int i = 0; i < expectedSliceCount; ++i) {
if (i < 2) {
assertEquals(3, slices[i].leaves.length);
} else {
assertEquals(2, slices[i].leaves.length);
}
}
IOUtils.close(directoryReader, iw, directory);
}

private SparseFixedBitSet query(LeafReaderContext leaf, String field, String value) throws IOException {
SparseFixedBitSet sparseFixedBitSet = new SparseFixedBitSet(leaf.reader().maxDoc());
TermsEnum tenum = leaf.reader().terms(field).iterator();
Expand Down
Loading

0 comments on commit f1bd76a

Please sign in to comment.