Skip to content

Commit

Permalink
Add minShards to sharddivide auto
Browse files Browse the repository at this point in the history
  • Loading branch information
tokee committed Jan 4, 2024
1 parent 7aa0072 commit dc961f0
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public class PropertiesLoader {

// Used by SolrStreamShard
public static final String SOLR_STREAM_SHARD_DIVIDE_PROPERTY = "solr.export.sharddivide.default";
public static final String SOLR_STREAM_SHARD_AUTO_MIN_PROPERTY = "solr.export.sharddivide.autolimit.default";
public static final String SOLR_STREAM_SHARD_AUTO_MIN_SHARDS_PROPERTY = "solr.export.sharddivide.autolimit.shards.default";
public static final String SOLR_STREAM_SHARD_AUTO_MIN_HITS_PROPERTY = "solr.export.sharddivide.autolimit.hits.default";
public static final String SOLR_STREAM_SHARD_DIVIDE_CONCURRENT_MAX_PROPERTY = "solr.export.sharddivide.concurrent.max";

private static final String URL_NORMALISER_PROPERTY="url.normaliser";
Expand Down Expand Up @@ -89,6 +90,7 @@ public class PropertiesLoader {

// Used by SolrStreamShard
public static String SOLR_STREAM_SHARD_DIVIDE = "auto";
public static long SOLR_STREAM_SHARD_AUTO_MIN_SHARDS = 2;
public static long SOLR_STREAM_SHARD_AUTO_MIN_HITS = 5000L;
// Maximum number of concurrent shard divided connections, shared between all shard divided calls
public static int SOLR_STREAM_SHARD_DIVIDE_CONCURRENT_MAX = 20;
Expand Down Expand Up @@ -131,7 +133,8 @@ public static void initProperties(String propertyFile) {
String timeout = serviceProperties.getProperty(SCREENSHOT_PREVIEW_TIMEOUT_PROPERTY);
URL_NORMALISER = serviceProperties.getProperty(URL_NORMALISER_PROPERTY,"normal");
SOLR_STREAM_SHARD_DIVIDE = serviceProperties.getProperty(SOLR_STREAM_SHARD_DIVIDE_PROPERTY, SOLR_STREAM_SHARD_DIVIDE);
SOLR_STREAM_SHARD_AUTO_MIN_HITS = Long.parseLong(serviceProperties.getProperty(SOLR_STREAM_SHARD_AUTO_MIN_PROPERTY, Long.toString(SOLR_STREAM_SHARD_AUTO_MIN_HITS)));
SOLR_STREAM_SHARD_AUTO_MIN_SHARDS = Long.parseLong(serviceProperties.getProperty(SOLR_STREAM_SHARD_AUTO_MIN_SHARDS_PROPERTY, Long.toString(SOLR_STREAM_SHARD_AUTO_MIN_SHARDS)));
SOLR_STREAM_SHARD_AUTO_MIN_HITS = Long.parseLong(serviceProperties.getProperty(SOLR_STREAM_SHARD_AUTO_MIN_HITS_PROPERTY, Long.toString(SOLR_STREAM_SHARD_AUTO_MIN_HITS)));
SOLR_STREAM_SHARD_DIVIDE_CONCURRENT_MAX = Integer.parseInt(serviceProperties.getProperty(SOLR_STREAM_SHARD_DIVIDE_CONCURRENT_MAX_PROPERTY, Integer.toString(SOLR_STREAM_SHARD_DIVIDE_CONCURRENT_MAX)));

URL waybacksURL = new URL (WAYBACK_BASEURL);
Expand Down Expand Up @@ -188,8 +191,9 @@ public static void initProperties(String propertyFile) {
log.info("Property:"+ SOLR_SERVER_CHECK_INTERVAL_PROPERTY +" = " + SOLR_SERVER_CHECK_INTERVAL);
log.info("Property:"+ SOLR_SEARCH_PARAMS_PROPERTY+" loaded map: " + SOLR_PARAMS_MAP);
log.info("Property:"+ SOLR_STREAM_SHARD_DIVIDE_PROPERTY + " = " + SOLR_STREAM_SHARD_DIVIDE);
log.info("Property:" + SOLR_STREAM_SHARD_AUTO_MIN_PROPERTY + " = " + SOLR_STREAM_SHARD_AUTO_MIN_PROPERTY);
log.info("Property:"+ SOLR_STREAM_SHARD_DIVIDE_CONCURRENT_MAX_PROPERTY + " = " + SOLR_STREAM_SHARD_DIVIDE_CONCURRENT_MAX);
log.info("Property:" + SOLR_STREAM_SHARD_AUTO_MIN_SHARDS_PROPERTY + " = " + SOLR_STREAM_SHARD_AUTO_MIN_SHARDS);
log.info("Property:" + SOLR_STREAM_SHARD_AUTO_MIN_HITS_PROPERTY + " = " + SOLR_STREAM_SHARD_AUTO_MIN_HITS);
log.info("Property:" + SOLR_STREAM_SHARD_DIVIDE_CONCURRENT_MAX_PROPERTY + " = " + SOLR_STREAM_SHARD_DIVIDE_CONCURRENT_MAX);
} catch (Exception e) {
e.printStackTrace(); // Acceptable as this is catastrophic
log.error("Could not load property file '" + propertyPath + "'",e);
Expand Down
34 changes: 30 additions & 4 deletions src/main/java/dk/kb/netarchivesuite/solrwayback/solr/SRequest.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ public enum CHOICE {always, never, auto}
public List<String> shards = null;
public String collection = null;
public CHOICE shardDivide = CHOICE.valueOf(PropertiesLoader.SOLR_STREAM_SHARD_DIVIDE);
/**
* If {@link #shardDivide} is {@link CHOICE#auto}, and the number of shards is at least
* {@link #shardDivideAutoMinShards} and the number of hits is above {@link #shardDivideAutoMinHits},
* shard division is activated.
*
* The default value is specified in properties, falling back to 2.
*/
public long shardDivideAutoMinShards = PropertiesLoader.SOLR_STREAM_SHARD_AUTO_MIN_SHARDS;
/**
* If {@link #shardDivide} is {@link CHOICE#auto}, a hit-counting search is performed before the
* full request is initiated. If the number of hits is above {@link #shardDivideAutoMinHits}, shard
Expand Down Expand Up @@ -815,9 +823,9 @@ public SRequest shards(List<String> shards) {
* If {@link #shardDivide} is {@link CHOICE#always}, all shards are queried in parallel, subject to
* connection limit rules, and the resulting streams are merged.
* <p>
* If {@link #shardDivide} is {@link CHOICE#auto}, a hit-counting search is performed before the
* full request is initiated. If the number of hits is above {@link #shardDivideAutoMinHits}, shard
* division is activated.
* If {@link #shardDivide} is {@link CHOICE#auto} and the number of shards is at least
* {@link #shardDivideAutoMinShards}, a hit-counting search is performed before the full request is initiated.
* If the number of hits is above {@link #shardDivideAutoMinHits}, shard division is activated.
* @param choice the shardDivide strategy. The default is {@link CHOICE#auto} if not specified in properties.
* @see #shardDivideAutoMinHits(long)
*/
Expand Down Expand Up @@ -862,12 +870,28 @@ public SRequest shardDivide(String choice) {
* shard division is activated.
* The default is 5000 if not specified otherwise in properties.
* @see #shardDivide(CHOICE)
* @see #shardDivideAutoMinShards(long)
*/
public SRequest shardDivideAutoMinHits(long minHits) {
this.shardDivideAutoMinHits = minHits;
return this;
}

/**
* If {@link #shardDivide} is {@link CHOICE#auto}, and the number of shards is at least
* {@link #shardDivideAutoMinShards} and the number of hits is above {@link #shardDivideAutoMinHits},
* shard division is activated.
* @param minShards if the shard divide strategy is auto, the number of shards must be at least this before
* shard division is activated.
* The default is 2 if not specified otherwise in properties.
* @see #shardDivide(CHOICE)
* @see #shardDivideAutoMinHits(long)
*/
public SRequest shardDivideAutoMinShards(long minShards) {
this.shardDivideAutoMinShards = minShards;
return this;
}

/**
* Newer Solrs (at least 9+) share a default upper limit of 1024 boolean clauses recursively in the user issued
* query tree. As multi-query uses batching, this limit can quickly be reached. Keep well below 1024.
Expand Down Expand Up @@ -1103,7 +1127,9 @@ public String toString() {
"expandResourcesFilterQueries=" + limit(expandResourcesFilterQueries, 20) + ", " +
"collection='" + collection + "'" + ", " +
"shards=" + shards + ", " +
"shardDivide=" + shardDivide;
"shardDivide=" + shardDivide + ", " +
"shardDivideAutoMinShards=" + shardDivideAutoMinShards + ", " +
"shardDivideAutoMinHits=" + shardDivideAutoMinHits;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public enum PAGING {
group }

/**
* Default page size (rows) for the cursormark paging.
* Default page size (rows) for the cursorMark paging.
*/
public static final int DEFAULT_PAGESIZE = 1000;
/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ public class SolrStreamFactory {
private static final Logger exportLog = LoggerFactory.getLogger("kb.dk.export");

/**
* Depending on the backing Solr (Cloud) topology, the collection and the {@link SRequest#shardDivide} and
* {@link SRequest#shardDivideAutoMinHits}, either standard collection based document search & delivery or
* shard dividing search & delivery is used to provide an Stream of {@link SolrDocument}s.
* Depending on the backing Solr (Cloud) topology, the collection and the {@link SRequest#shardDivide},
* {@link SRequest#shardDivideAutoMinShards} and {@link SRequest#shardDivideAutoMinHits}, either standard
* collection based document search & delivery or shard dividing search & delivery is used to provide a
* Stream of {@link SolrDocument}s.
* <p>
* Important: This method returns a {@link CollectionUtils.CloseableStream} and the caller <strong>must</strong>
* ensure that it is either depleted or closed after use, to avoid resource leaking. It is highly recommended to
Expand All @@ -60,9 +61,10 @@ public static CollectionUtils.CloseableStream<SolrDocument> stream(SRequest requ
}

/**
* Depending on the backing Solr Cloud topology, the collection and the {@link SRequest#shardDivide} and
* {@link SRequest#shardDivideAutoMinHits}, either standard collection based document search & delivery or
* shard dividing search & delivery is used to provide an iterator of {@link SolrDocument}s.
* Depending on the backing Solr (Cloud) topology, the collection and the {@link SRequest#shardDivide},
* {@link SRequest#shardDivideAutoMinShards} and {@link SRequest#shardDivideAutoMinHits}, either standard
* collection based document search & delivery or shard dividing search & delivery is used to provide an
* iterator of {@link SolrDocument}s.
* <p>
* Important: This method returns a {@link CollectionUtils.CloseableIterator}
* and the caller <strong>must</strong> ensure that it is either depleted or closed after use, to avoid resource
Expand Down Expand Up @@ -110,10 +112,16 @@ public static CollectionUtils.CloseableIterator<SolrDocument> iterate(SRequest r
return CollectionUtils.CloseableIterator.single(SolrStreamDirect.iterate(request));
}
if (shards.size() == 1) {
log.debug("shardDivide == auto, only 1 shard is specified/available: '{}'. " +
log.debug("shardDivide == auto, but only 1 shard is specified/available: '{}'. " +
"Using collection oriented Solr document streaming", shards.get(0));
return CollectionUtils.CloseableIterator.single(SolrStreamDirect.iterate(request));
}
if (shards.size() < request.shardDivideAutoMinShards) {
log.debug("shardDivide == auto, but only {} shards are specified/available with " +
"shardDivideAutoMinShards = {}. Using collection oriented Solr document streaming",
shards.size(), request.shardDivideAutoMinShards);
return CollectionUtils.CloseableIterator.single(SolrStreamDirect.iterate(request));
}
long hits = SolrStreamShard.getApproximateHits(request);
if (hits < request.shardDivideAutoMinHits) {
log.debug("shardDivide == auto, but approximate hitcount {} is < limit {}. " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,40 +150,8 @@ public void testStageSpeed() {
}
// TODO: Add "minShards"-property to auto

// Sparseness: sha1:G22*
// Collapsing
// big / small fields
// Only docValues / only stored
// Sort

// **** Got 2000 hits in 172,489 ms: 0.01hits/ms for shardDivide=always
// **** Got 2000 hits in 355,839 ms: 0.01hits/ms for shardDivide=never
// **** Got 2000 hits in 150,393 ms: 0.01hits/ms for shardDivide=always
// **** Got 2000 hits in 355,739 ms: 0.01hits/ms for shardDivide=never
// **** Got 4000 hits in 440,551 ms: 0.01hits/ms for shardDivide=always
// **** Got 4000 hits in 702,725 ms: 0.01hits/ms for shardDivide=never
// **** Got 4000 hits in 466,712 ms: 0.01hits/ms for shardDivide=always
// **** Got 4000 hits in 709,237 ms: 0.01hits/ms for shardDivide=never
// .sort("index_time asc")
//**** Got 20000 hits in 13,909 ms: 1437.9 hits/s for shardDivide=always
//**** Got 20000 hits in 83,882 ms: 238.4 hits/s for shardDivide=never
//**** Got 20000 hits in 13,883 ms: 1440.6 hits/s for shardDivide=always
//**** Got 20000 hits in 90,263 ms: 221.6 hits/s for shardDivide=never
// No sort
//**** Got 20000 hits in 93,379 ms: 214.2 hits/s for shardDivide=always
//**** Got 20000 hits in 154,528 ms: 129.4 hits/s for shardDivide=never
//**** Got 20000 hits in 80,361 ms: 248.9 hits/s for shardDivide=always
//**** Got 20000 hits in 147,427 ms: 135.7 hits/s for shardDivide=never
// .sort("index_time asc")
//**** Got 20000 hits in 12,734 ms: 1570.6 hits/s for shardDivide=always
//**** Got 20000 hits in 81,209 ms: 246.3 hits/s for shardDivide=never
//**** Got 20000 hits in 12,574 ms: 1590.6 hits/s for shardDivide=always
//**** Got 20000 hits in 86,156 ms: 232.1 hits/s for shardDivide=never
// .deduplicateField("domain")
//

@Test
public void testShardedSearch() throws IOException {
public void testShardedSearch() {
if (!AVAILABLE) {
return;
}
Expand Down

0 comments on commit dc961f0

Please sign in to comment.