Skip to content

Commit

Permalink
HSEARCH-2945 Use a default mass indexing monitor configurer
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-bekhta committed Sep 11, 2024
1 parent fc702c6 commit 452f7ee
Show file tree
Hide file tree
Showing 17 changed files with 547 additions and 79 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.integrationtest.mapper.pojo.massindexing;

import static org.assertj.core.api.Fail.fail;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.CompletableFuture;

import org.hibernate.search.engine.backend.work.execution.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy;
import org.hibernate.search.engine.cfg.EngineSettings;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.PersistenceTypeKey;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubEntityLoadingBinder;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubLoadingContext;
import org.hibernate.search.mapper.pojo.loading.mapping.annotation.EntityLoadingBinderRef;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.DocumentId;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.GenericField;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.SearchEntity;
import org.hibernate.search.mapper.pojo.standalone.mapping.SearchMapping;
import org.hibernate.search.mapper.pojo.standalone.massindexing.MassIndexer;
import org.hibernate.search.mapper.pojo.standalone.session.SearchSession;
import org.hibernate.search.util.impl.integrationtest.common.extension.BackendMock;
import org.hibernate.search.util.impl.integrationtest.mapper.pojo.standalone.StandalonePojoMappingSetupHelper;
import org.hibernate.search.util.impl.test.extension.ExpectedLog4jLog;

import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import org.apache.logging.log4j.Level;

class MassIndexingDefaultMonitorIT {

private static final int COUNT = 100;
@RegisterExtension
public final BackendMock backendMock = BackendMock.create();

@RegisterExtension
public final StandalonePojoMappingSetupHelper setupHelper =
StandalonePojoMappingSetupHelper.withBackendMock( MethodHandles.lookup(), backendMock );

@RegisterExtension
public ExpectedLog4jLog logged = ExpectedLog4jLog.create();

private final StubLoadingContext loadingContext = new StubLoadingContext();

private SearchMapping setup(String failureHandler) {
backendMock.expectAnySchema( Book.NAME );

SearchMapping mapping = setupHelper.start()
.expectCustomBeans()
.withPropertyRadical( EngineSettings.BACKGROUND_FAILURE_HANDLER, failureHandler )
.setup( Book.class );

backendMock.verifyExpectationsMet();

initData();

return mapping;
}

@ValueSource(booleans = { true, false })
@ParameterizedTest
void skipTotalCount(boolean doCounts) {
SearchMapping mapping = setup( null );

logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" );
if ( doCounts ) {
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).once();
}
else {
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).never();
}

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

CompletableFuture<?> indexingFuture = new CompletableFuture<>();
indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) );

// add operations on indexes can follow any random order,
// since they are executed by different threads
BackendMock.DocumentWorkCallListContext expectWorks = backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
);
for ( int i = 0; i < COUNT; i++ ) {
final String id = Integer.toString( i );
expectWorks
.add( id, b -> b
.field( "title", "TITLE_" + id )
.field( "author", "AUTHOR_" + id )
);
}

// purgeAtStart and mergeSegmentsAfterPurge are enabled by default,
// so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order:
backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() )
.purge()
.mergeSegments()
.flush()
.refresh();

try {
indexer.monitor( f -> f.countEntitiesToIndexOnStart( doCounts ) )
.startAndWait();
}
catch (InterruptedException e) {
fail( "Unexpected InterruptedException: " + e.getMessage() );
}
}

backendMock.verifyExpectationsMet();
}

@ValueSource(booleans = { true, false })
@ParameterizedTest
void totalCountAtStart(boolean doCounts) {
SearchMapping mapping = setup( null );

logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" );
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).once();
if ( doCounts ) {
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.once();
}
else {
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.never();
}

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

CompletableFuture<?> indexingFuture = new CompletableFuture<>();
indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) );

// add operations on indexes can follow any random order,
// since they are executed by different threads
BackendMock.DocumentWorkCallListContext expectWorks = backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
);
for ( int i = 0; i < COUNT; i++ ) {
final String id = Integer.toString( i );
expectWorks
.add( id, b -> b
.field( "title", "TITLE_" + id )
.field( "author", "AUTHOR_" + id )
);
}

// purgeAtStart and mergeSegmentsAfterPurge are enabled by default,
// so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order:
backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() )
.purge()
.mergeSegments()
.flush()
.refresh();

try {
indexer.monitor( f -> f.countAllEntitiesBeforeStart( doCounts ) )
.startAndWait();
}
catch (InterruptedException e) {
fail( "Unexpected InterruptedException: " + e.getMessage() );
}
}

backendMock.verifyExpectationsMet();
}

private void initData() {
for ( int i = 0; i < COUNT; i++ ) {
persist( new Book( i, "TITLE_" + i, "AUTHOR_" + i ) );
}
}

private void persist(Book book) {
loadingContext.persistenceMap( Book.PERSISTENCE_KEY ).put( book.id, book );
}

@SearchEntity(name = Book.NAME,
loadingBinder = @EntityLoadingBinderRef(type = StubEntityLoadingBinder.class))
@Indexed(index = Book.NAME)
public static class Book {

public static final String NAME = "Book";
public static final PersistenceTypeKey<Book, Integer> PERSISTENCE_KEY =
new PersistenceTypeKey<>( Book.class, Integer.class );

@DocumentId
private Integer id;

@GenericField
private String title;

@GenericField
private String author;

public Book() {
}

public Book(Integer id, String title, String author) {
this.id = id;
this.title = title;
this.author = author;
}

public Integer getId() {
return id;
}

public String getTitle() {
return title;
}

public String getAuthor() {
return author;
}
}

private static class SimulatedFailure extends RuntimeException {
SimulatedFailure(String message) {
super( message );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ void simple() {
// add operations on indexes can follow any random order,
// since they are executed by different threads
backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
.add( "1", b -> b
.field( "title", TITLE_1 )
.field( "author", AUTHOR_1 )
Expand Down Expand Up @@ -134,7 +134,6 @@ void skipTotalCount() {

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
.countEntitiesToIndexOnStart( false )
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

Expand All @@ -144,8 +143,8 @@ void skipTotalCount() {
// add operations on indexes can follow any random order,
// since they are executed by different threads
backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
.add( "1", b -> b
.field( "title", TITLE_1 )
.field( "author", AUTHOR_1 )
Expand All @@ -169,7 +168,7 @@ void skipTotalCount() {
.refresh();

try {
indexer.monitor( new StaticCountersMonitor() )
indexer.monitor( new StaticCountersMonitor( false ) )
.startAndWait();
}
catch (SearchException ignored) {
Expand Down Expand Up @@ -247,6 +246,22 @@ public static class StaticCountersMonitor implements MassIndexingMonitor {
public static StaticCounters.Key TOTAL = StaticCounters.createKey();
public static StaticCounters.Key INDEXING_COMPLETED = StaticCounters.createKey();


private final boolean requiresTotalCount;

public StaticCountersMonitor() {
this( true );
}

public StaticCountersMonitor(boolean requiresTotalCount) {
this.requiresTotalCount = requiresTotalCount;
}

@Override
public boolean requiresTotalCount() {
return requiresTotalCount;
}

@Override
public void documentsAdded(long increment) {
StaticCounters.get().add( ADDED, (int) increment );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
*/
package org.hibernate.search.mapper.orm.massindexing;

import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

import org.hibernate.CacheMode;
import org.hibernate.search.mapper.pojo.loading.spi.PojoMassIdentifierLoader;
import org.hibernate.search.mapper.pojo.massindexing.DefaultMassIndexingMonitorConfigurationContext;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingEnvironment;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingFailureHandler;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor;
import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexer;
import org.hibernate.search.util.common.annotation.Incubating;

/**
Expand Down Expand Up @@ -241,19 +240,13 @@ public interface MassIndexer {
MassIndexer failFast(boolean failFast);

/**
* Allows specifying whether to try determining the total number of entities of the particular type to index
* and passing that information to the monitor, or skipping the counting and passing an {@link OptionalLong#empty() empty optional} instead.
* as a count.
* Configures the default {@link MassIndexingMonitor}.
* <p>
* It may be helpful to skip the counting of entities and start the ID fetching right away to save some time.
* <p>
* Defaults to {@code true}.
* @param countEntitiesToIndexOnStart If {@code true}, the mass indexer will try determining the total number of entities,
* otherwise the mass indexer will not try obtaining the total count.
* The default monitor just logs the progress. This configurer allows to influence the logging behavior.
*
* @param configurer The monitor that will track mass indexing progress.
* @return {@code this} for method chaining
*/
@Incubating
MassIndexer countEntitiesToIndexOnStart(boolean countEntitiesToIndexOnStart);
MassIndexer monitor(Consumer<DefaultMassIndexingMonitorConfigurationContext> configurer);

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
package org.hibernate.search.mapper.orm.massindexing.impl;

import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

import org.hibernate.CacheMode;
import org.hibernate.search.mapper.orm.loading.spi.ConditionalExpression;
import org.hibernate.search.mapper.orm.massindexing.MassIndexer;
import org.hibernate.search.mapper.orm.massindexing.MassIndexerFilteringTypeStep;
import org.hibernate.search.mapper.pojo.massindexing.DefaultMassIndexingMonitorConfigurationContext;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingEnvironment;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingFailureHandler;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor;
Expand Down Expand Up @@ -132,8 +134,8 @@ public MassIndexer failFast(boolean failFast) {
}

@Override
public MassIndexer countEntitiesToIndexOnStart(boolean countEntitiesToIndexOnStart) {
delegate.countEntitiesToIndexOnStart( countEntitiesToIndexOnStart );
public MassIndexer monitor(Consumer<DefaultMassIndexingMonitorConfigurationContext> configurer) {
delegate.monitor( configurer );
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1042,4 +1042,9 @@ void indexingProgressWithRemainingTime(float estimatePercentileComplete, long do
@LogMessage(level = INFO)
@Message(id = ID_OFFSET + 168, value = "Mass indexing complete in %3$s. Indexed %1$d/%2$d entities.")
void indexingEntitiesCompleted(long indexed, long total, Duration indexingTime);

@LogMessage(level = INFO)
@Message(id = ID_OFFSET + 169,
value = "Mass indexing is going to index approx. %1$d entities (%2$s). Actual number may change once the indexing starts.")
void indexingEntitiesApprox(long count, String types);
}
Loading

0 comments on commit 452f7ee

Please sign in to comment.