Skip to content

Commit

Permalink
HSEARCH-2945 Allow skipping total count calls in a massindexer
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-bekhta committed Sep 10, 2024
1 parent a8b7acb commit fc702c6
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ void simple() {
// add operations on indexes can follow any random order,
// since they are executed by different threads
backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
.add( "1", b -> b
.field( "title", TITLE_1 )
.field( "author", AUTHOR_1 )
Expand Down Expand Up @@ -128,6 +128,67 @@ void simple() {
assertThat( staticCounters.get( StaticCountersMonitor.INDEXING_COMPLETED ) ).isEqualTo( 1 );
}

@Test
void skipTotalCount() {
SearchMapping mapping = setup( null );

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
.countEntitiesToIndexOnStart( false )
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

CompletableFuture<?> indexingFuture = new CompletableFuture<>();
indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) );

// add operations on indexes can follow any random order,
// since they are executed by different threads
backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
.add( "1", b -> b
.field( "title", TITLE_1 )
.field( "author", AUTHOR_1 )
)
.add( "3", b -> b
.field( "title", TITLE_3 )
.field( "author", AUTHOR_3 )
)
.createAndExecuteFollowingWorks( indexingFuture )
.add( "2", b -> b
.field( "title", TITLE_2 )
.field( "author", AUTHOR_2 )
);

// purgeAtStart and mergeSegmentsAfterPurge are enabled by default,
// so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order:
backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() )
.purge()
.mergeSegments()
.flush()
.refresh();

try {
indexer.monitor( new StaticCountersMonitor() )
.startAndWait();
}
catch (SearchException ignored) {
// Expected, but not relevant to this test
}
catch (InterruptedException e) {
fail( "Unexpected InterruptedException: " + e.getMessage() );
}
}

backendMock.verifyExpectationsMet();

assertThat( staticCounters.get( StaticCountersMonitor.LOADED ) ).isEqualTo( 3 );
assertThat( staticCounters.get( StaticCountersMonitor.BUILT ) ).isEqualTo( 3 );
assertThat( staticCounters.get( StaticCountersMonitor.ADDED ) ).isEqualTo( 2 );
assertThat( staticCounters.get( StaticCountersMonitor.TOTAL ) ).isEqualTo( 0 );
assertThat( staticCounters.get( StaticCountersMonitor.INDEXING_COMPLETED ) ).isEqualTo( 1 );
}

private void initData() {
persist( new Book( 1, TITLE_1, AUTHOR_1 ) );
persist( new Book( 2, TITLE_2, AUTHOR_2 ) );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
*/
package org.hibernate.search.mapper.orm.massindexing;

import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.hibernate.CacheMode;
import org.hibernate.search.mapper.pojo.loading.spi.PojoMassIdentifierLoader;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingEnvironment;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingFailureHandler;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor;
import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexer;
import org.hibernate.search.util.common.annotation.Incubating;

/**
Expand Down Expand Up @@ -228,12 +231,29 @@ public interface MassIndexer {
* <p>
* With fail-fast option enabled, the mass indexer will request cancelling all internal mass-indexing processes
* right after the first error is reported to the {@link MassIndexingFailureHandler}.
*
* <p>
* Defaults to {@code false}.
* @param failFast Whether to enabled fail fast option for this mass indexer.
*
* @return {@code this} for method chaining
*/
@Incubating
MassIndexer failFast(boolean failFast);

/**
* Allows specifying whether to try determining the total number of entities of the particular type to index
* and passing that information to the monitor, or skipping the counting and passing an {@link OptionalLong#empty() empty optional} instead.
* as a count.
* <p>
* It may be helpful to skip the counting of entities and start the ID fetching right away to save some time.
* <p>
* Defaults to {@code true}.
* @param countEntitiesToIndexOnStart If {@code true}, the mass indexer will try determining the total number of entities,
* otherwise the mass indexer will not try obtaining the total count.
*
* @return {@code this} for method chaining
*/
@Incubating
MassIndexer countEntitiesToIndexOnStart(boolean countEntitiesToIndexOnStart);

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ public MassIndexer failFast(boolean failFast) {
return this;
}

@Override
public MassIndexer countEntitiesToIndexOnStart(boolean countEntitiesToIndexOnStart) {
delegate.countEntitiesToIndexOnStart( countEntitiesToIndexOnStart );
return this;
}

ConditionalExpression reindexOnly(Class<?> type, String conditionalExpression) {
return context.reindexOnly( type, conditionalExpression );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public void afterExecution(Context context) {
private Boolean purgeAtStart;
private Boolean mergeSegmentsAfterPurge;
private Boolean failFast;
private Boolean countEntitiesToIndexOnStart;
private Long failureFloodingThreshold = null;

private MassIndexingFailureHandler failureHandler;
Expand Down Expand Up @@ -205,7 +206,8 @@ private PojoMassIndexingBatchCoordinator createCoordinator() {
actualDropAndCreateSchemaOnStart,
// false if not set explicitly and dropAndCreateSchemaOnStart is set to true, otherwise true by default:
purgeAtStart == null ? !actualDropAndCreateSchemaOnStart : purgeAtStart,
mergeSegmentsAfterPurge
mergeSegmentsAfterPurge,
!Boolean.FALSE.equals( countEntitiesToIndexOnStart )
);
}

Expand Down Expand Up @@ -233,6 +235,12 @@ public PojoMassIndexer failFast(boolean failFast) {
return this;
}

@Override
public PojoMassIndexer countEntitiesToIndexOnStart(boolean countEntitiesToIndexOnStart) {
this.countEntitiesToIndexOnStart = countEntitiesToIndexOnStart;
return this;
}

private MassIndexingFailureHandler getOrCreateFailureHandler() {
MassIndexingFailureHandler handler = failureHandler;
if ( handler == null ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class PojoMassIndexingBatchCoordinator extends PojoMassIndexingFailureHan
private final boolean dropAndCreateSchemaOnStart;
private final boolean purgeAtStart;
private final Boolean mergeSegmentsAfterPurge;
private final boolean countEntitiesToIndexOnStart;

private final List<CompletableFuture<?>> indexingFutures = new ArrayList<>();

Expand All @@ -64,7 +65,8 @@ public PojoMassIndexingBatchCoordinator(PojoMassIndexingMappingContext mappingCo
PojoScopeDelegate<?, ?, ?> pojoScopeDelegate,
MassIndexingEnvironment environment,
int typesToIndexInParallel, int documentBuilderThreads, Boolean mergeSegmentsOnFinish,
boolean dropAndCreateSchemaOnStart, Boolean purgeAtStart, Boolean mergeSegmentsAfterPurge) {
boolean dropAndCreateSchemaOnStart, Boolean purgeAtStart, Boolean mergeSegmentsAfterPurge,
boolean countEntitiesToIndexOnStart) {
super( notifier, environment );
this.mappingContext = mappingContext;
this.typeGroupsToIndex = typeGroupsToIndex;
Expand All @@ -78,6 +80,7 @@ public PojoMassIndexingBatchCoordinator(PojoMassIndexingMappingContext mappingCo
this.dropAndCreateSchemaOnStart = dropAndCreateSchemaOnStart;
this.purgeAtStart = purgeAtStart;
this.mergeSegmentsAfterPurge = mergeSegmentsAfterPurge;
this.countEntitiesToIndexOnStart = countEntitiesToIndexOnStart;

this.agentStartContext = new PojoMassIndexerAgentStartContextImpl(
mappingContext.threadPoolProvider(),
Expand Down Expand Up @@ -193,7 +196,8 @@ private void doBatchWork() throws InterruptedException {
mappingContext, getNotifier(), getMassIndexingEnvironment(), typeGroup,
typeGroup.loadingStrategy(), massIndexingContext,
documentBuilderThreads,
context.tenantIdentifier()
context.tenantIdentifier(),
countEntitiesToIndexOnStart
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ public class PojoMassIndexingBatchIndexingWorkspace<E, I> extends PojoMassIndexi

private final int entityExtractingThreads;
private final String tenantId;
private final boolean countEntitiesToIndexOnStart;
private final MassIndexingTypeGroupMonitor typeGroupMonitor;

PojoMassIndexingBatchIndexingWorkspace(PojoMassIndexingMappingContext mappingContext,
Expand All @@ -57,14 +58,16 @@ public class PojoMassIndexingBatchIndexingWorkspace<E, I> extends PojoMassIndexi
PojoMassIndexingIndexedTypeGroup<E> typeGroup,
PojoMassLoadingStrategy<E, I> loadingStrategy,
PojoMassIndexingContext massIndexingContext,
int entityExtractingThreads, String tenantId) {
int entityExtractingThreads, String tenantId,
boolean countEntitiesToIndexOnStart) {
super( notifier, environment );
this.mappingContext = mappingContext;
this.typeGroup = typeGroup;
this.loadingStrategy = loadingStrategy;
this.massIndexingContext = massIndexingContext;
this.entityExtractingThreads = entityExtractingThreads;
this.tenantId = tenantId;
this.countEntitiesToIndexOnStart = countEntitiesToIndexOnStart;
this.typeGroupMonitor = notifier.typeGroupMonitor( new MassIndexingTypeGroupMonitorContextImpl( typeGroup ) );
}

Expand Down Expand Up @@ -113,7 +116,8 @@ private void startProducingPrimaryKeys(PojoProducerConsumerQueue<List<I>> identi
typeGroupMonitor,
massIndexingContext, getMassIndexingEnvironment(),
typeGroup, loadingStrategy,
identifierQueue, tenantId
identifierQueue, tenantId,
countEntitiesToIndexOnStart
);
//execIdentifiersLoader has size 1 and is not configurable: ensures the list is consistent as produced by one transaction
final ThreadPoolExecutor identifierProducingExecutor = mappingContext.threadPoolProvider().newFixedThreadPool(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,23 @@ public class PojoMassIndexingEntityIdentifierLoadingRunnable<E, I>
private final PojoProducerConsumerQueue<List<I>> identifierQueue;
private final String tenantId;
private final MassIndexingEnvironment.EntityIdentifierLoadingContext identifierLoadingContext;
private final boolean countEntitiesToIndexOnStart;

public PojoMassIndexingEntityIdentifierLoadingRunnable(PojoMassIndexingNotifier notifier,
MassIndexingTypeGroupMonitor typeGroupMonitor,
PojoMassIndexingContext massIndexingContext, MassIndexingEnvironment environment,
PojoMassIndexingIndexedTypeGroup<E> typeGroup,
PojoMassLoadingStrategy<E, I> loadingStrategy,
PojoProducerConsumerQueue<List<I>> identifierQueue, String tenantId) {
PojoProducerConsumerQueue<List<I>> identifierQueue, String tenantId,
boolean countEntitiesToIndexOnStart) {
super( notifier, environment );
this.typeGroupMonitor = typeGroupMonitor;
this.massIndexingContext = massIndexingContext;
this.loadingStrategy = loadingStrategy;
this.typeGroup = typeGroup;
this.identifierQueue = identifierQueue;
this.tenantId = tenantId;
this.countEntitiesToIndexOnStart = countEntitiesToIndexOnStart;

this.identifierLoadingContext = new EntityIdentifierLoadingContextImpl();
}
Expand All @@ -55,7 +58,7 @@ protected void runWithFailureHandler() throws InterruptedException {
log.trace( "started" );
LoadingContext context = new LoadingContext();
try ( PojoMassIdentifierLoader loader = loadingStrategy.createIdentifierLoader( typeGroup.includedTypes(), context ) ) {
OptionalLong count = loader.totalCount();
OptionalLong count = countEntitiesToIndexOnStart ? loader.totalCount() : OptionalLong.empty();
typeGroupMonitor.indexingStarted( count );
if ( count.isPresent() ) {
getNotifier().reportAddedTotalCount( count.getAsLong() );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.hibernate.search.mapper.pojo.loading.spi.PojoMassIdentifierLoader;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingEnvironment;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingFailureHandler;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor;
Expand Down Expand Up @@ -165,11 +166,26 @@ public interface PojoMassIndexer {
* <p>
* With fail-fast option enabled, the mass indexer will request cancelling all internal mass-indexing processes
* right after the first error is reported to the {@link MassIndexingFailureHandler}.
*
* <p>
* Defaults to {@code false}.
* @param failFast Whether to enabled fail fast option for this mass indexer.
*
* @return {@code this} for method chaining
*/
@Incubating
PojoMassIndexer failFast(boolean failFast);

/**
* Allows specifying whether to skip the counting of entities to index before starting retrieving the entity IDs.
* <p>
* It may be helpful to skip the counting of entities and start the ID fetching right away to save some time.
* <p>
* Defaults to {@code true}.
* @param countEntitiesToIndexOnStart If {@code true}, the mass indexer will try calling {@link PojoMassIdentifierLoader#totalCount()}
* and passing the returned value to the monitors, otherwise the total count call will not be made and an empty optional will be passed to the monitors.
*
* @return {@code this} for method chaining
*/
@Incubating
PojoMassIndexer countEntitiesToIndexOnStart(boolean countEntitiesToIndexOnStart);
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@
*/
package org.hibernate.search.mapper.pojo.standalone.massindexing;

import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.hibernate.search.mapper.pojo.loading.spi.PojoMassIdentifierLoader;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingEnvironment;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingFailureHandler;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor;
import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexer;
import org.hibernate.search.mapper.pojo.standalone.loading.MassLoadingOptions;
import org.hibernate.search.util.common.annotation.Incubating;

Expand Down Expand Up @@ -195,4 +198,20 @@ public interface MassIndexer {
*/
@Incubating
MassIndexer failFast(boolean failFast);

/**
* Allows specifying whether to try determining the total number of entities of the particular type to index
* and passing that information to the monitor, or skipping the counting and passing an {@link OptionalLong#empty() empty optional} instead.
* as a count.
* <p>
* It may be helpful to skip the counting of entities and start the ID fetching right away to save some time.
* <p>
* Defaults to {@code true}.
* @param countEntitiesToIndexOnStart If {@code true}, the mass indexer will try determining the total number of entities,
* otherwise the mass indexer will not try obtaining the total count.
*
* @return {@code this} for method chaining
*/
@Incubating
MassIndexer countEntitiesToIndexOnStart(boolean countEntitiesToIndexOnStart);
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,10 @@ public MassIndexer failFast(boolean failFast) {
delegate.failFast( failFast );
return this;
}

@Override
public MassIndexer countEntitiesToIndexOnStart(boolean countEntitiesToIndexOnStart) {
delegate.countEntitiesToIndexOnStart( countEntitiesToIndexOnStart );
return this;
}
}

0 comments on commit fc702c6

Please sign in to comment.