Skip to content

Commit

Permalink
HSEARCH-2945 Update mass indexer monitor configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-bekhta committed Sep 13, 2024
1 parent 0767e58 commit 12b8628
Show file tree
Hide file tree
Showing 21 changed files with 714 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ The default, built-in monitor logs progress periodically at the `INFO` level,
but a custom monitor can be set by implementing the `MassIndexingMonitor` interface
and passing an instance using the `monitor` method.

The built-in monitor's behaviour can be customized through `DefaultMassIndexingMonitor` builder,
e.g. `indexer.monitor( DefaultMassIndexingMonitor.builder().countOnStart( false ).build() ) )`

Implementations of `MassIndexingMonitor` must be thread-safe.

|`failureHandler(MassIndexingFailureHandler)`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public String getAuthor() {
}
}

@SuppressWarnings("removal")
public static class StaticCountersMonitor implements MassIndexingMonitor {

public static StaticCounters.Key ADDED = StaticCounters.createKey();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.integrationtest.mapper.pojo.massindexing;

import static org.assertj.core.api.Fail.fail;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import org.hibernate.search.engine.backend.work.execution.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.PersistenceTypeKey;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubEntityLoadingBinder;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubLoadingContext;
import org.hibernate.search.mapper.pojo.loading.mapping.annotation.EntityLoadingBinderRef;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.DocumentId;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.GenericField;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.SearchEntity;
import org.hibernate.search.mapper.pojo.massindexing.DefaultMassIndexingMonitor;
import org.hibernate.search.mapper.pojo.standalone.mapping.SearchMapping;
import org.hibernate.search.mapper.pojo.standalone.massindexing.MassIndexer;
import org.hibernate.search.mapper.pojo.standalone.session.SearchSession;
import org.hibernate.search.util.impl.integrationtest.common.extension.BackendMock;
import org.hibernate.search.util.impl.integrationtest.mapper.pojo.standalone.StandalonePojoMappingSetupHelper;
import org.hibernate.search.util.impl.test.extension.ExpectedLog4jLog;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import org.apache.logging.log4j.Level;

class MassIndexingDefaultMonitorIT {

private static final int COUNT = 100;
@RegisterExtension
public final BackendMock backendMock = BackendMock.create();

@RegisterExtension
public final StandalonePojoMappingSetupHelper setupHelper =
StandalonePojoMappingSetupHelper.withBackendMock( MethodHandles.lookup(), backendMock );

@RegisterExtension
public ExpectedLog4jLog logged = ExpectedLog4jLog.create();

private final StubLoadingContext loadingContext = new StubLoadingContext();

@ValueSource(booleans = { true, false })
@ParameterizedTest
void countOnBeforeType(boolean doCounts) {
actualTest( () -> {
logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" );
if ( doCounts ) {
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).once();
}
else {
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).never();
}
}, indexer -> indexer.monitor( DefaultMassIndexingMonitor.builder().countOnBeforeType( doCounts ).build() ) );
}

@ValueSource(booleans = { true, false })
@ParameterizedTest
void countOnStart(boolean doCounts) {
actualTest( () -> {
logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" );
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).once();
if ( doCounts ) {
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.once();
}
else {
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.never();
}
}, indexer -> indexer.monitor( DefaultMassIndexingMonitor.builder().countOnStart( doCounts ).build() ) );
}

@Test
void noCountsAtAll() {
actualTest( () -> {
logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" ).once();
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).never();
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.never();
}, indexer -> indexer
.monitor( DefaultMassIndexingMonitor.builder().countOnStart( false ).countOnBeforeType( false ).build() ) );
}

private void actualTest(Runnable expectedLogs, Consumer<MassIndexer> massIndexerConfiguration) {
backendMock.expectAnySchema( Book.NAME );

SearchMapping mapping = setupHelper.start()
.expectCustomBeans()
.setup( Book.class );

backendMock.verifyExpectationsMet();

initData();

expectedLogs.run();

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

CompletableFuture<?> indexingFuture = new CompletableFuture<>();
indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) );

// add operations on indexes can follow any random order,
// since they are executed by different threads
BackendMock.DocumentWorkCallListContext expectWorks = backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
);
for ( int i = 0; i < COUNT; i++ ) {
final String id = Integer.toString( i );
expectWorks
.add( id, b -> b
.field( "title", "TITLE_" + id )
.field( "author", "AUTHOR_" + id )
);
}

// purgeAtStart and mergeSegmentsAfterPurge are enabled by default,
// so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order:
backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() )
.purge()
.mergeSegments()
.flush()
.refresh();

try {
massIndexerConfiguration.accept( indexer );
indexer.startAndWait();
}
catch (InterruptedException e) {
fail( "Unexpected InterruptedException: " + e.getMessage() );
}
}

backendMock.verifyExpectationsMet();
}

private void initData() {
for ( int i = 0; i < COUNT; i++ ) {
persist( new Book( i, "TITLE_" + i, "AUTHOR_" + i ) );
}
}

private void persist(Book book) {
loadingContext.persistenceMap( Book.PERSISTENCE_KEY ).put( book.id, book );
}

@SearchEntity(name = Book.NAME,
loadingBinder = @EntityLoadingBinderRef(type = StubEntityLoadingBinder.class))
@Indexed(index = Book.NAME)
public static class Book {

public static final String NAME = "Book";
public static final PersistenceTypeKey<Book, Integer> PERSISTENCE_KEY =
new PersistenceTypeKey<>( Book.class, Integer.class );

@DocumentId
private Integer id;

@GenericField
private String title;

@GenericField
private String author;

public Book() {
}

public Book(Integer id, String title, String author) {
this.id = id;
this.title = title;
this.author = author;
}

public Integer getId() {
return id;
}

public String getTitle() {
return title;
}

public String getAuthor() {
return author;
}
}

private static class SimulatedFailure extends RuntimeException {
SimulatedFailure(String message) {
super( message );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.SearchEntity;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorContext;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorCreateContext;
import org.hibernate.search.mapper.pojo.standalone.mapping.SearchMapping;
import org.hibernate.search.mapper.pojo.standalone.massindexing.MassIndexer;
import org.hibernate.search.mapper.pojo.standalone.session.SearchSession;
Expand Down Expand Up @@ -128,6 +131,66 @@ void simple() {
assertThat( staticCounters.get( StaticCountersMonitor.INDEXING_COMPLETED ) ).isEqualTo( 1 );
}

@Test
void skipTotalCount() {
SearchMapping mapping = setup( null );

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

CompletableFuture<?> indexingFuture = new CompletableFuture<>();
indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) );

// add operations on indexes can follow any random order,
// since they are executed by different threads
backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
.add( "1", b -> b
.field( "title", TITLE_1 )
.field( "author", AUTHOR_1 )
)
.add( "3", b -> b
.field( "title", TITLE_3 )
.field( "author", AUTHOR_3 )
)
.createAndExecuteFollowingWorks( indexingFuture )
.add( "2", b -> b
.field( "title", TITLE_2 )
.field( "author", AUTHOR_2 )
);

// purgeAtStart and mergeSegmentsAfterPurge are enabled by default,
// so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order:
backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() )
.purge()
.mergeSegments()
.flush()
.refresh();

try {
indexer.monitor( new StaticCountersMonitor( false ) )
.startAndWait();
}
catch (SearchException ignored) {
// Expected, but not relevant to this test
}
catch (InterruptedException e) {
fail( "Unexpected InterruptedException: " + e.getMessage() );
}
}

backendMock.verifyExpectationsMet();

assertThat( staticCounters.get( StaticCountersMonitor.LOADED ) ).isEqualTo( 3 );
assertThat( staticCounters.get( StaticCountersMonitor.BUILT ) ).isEqualTo( 3 );
assertThat( staticCounters.get( StaticCountersMonitor.ADDED ) ).isEqualTo( 2 );
assertThat( staticCounters.get( StaticCountersMonitor.TOTAL ) ).isEqualTo( 0 );
assertThat( staticCounters.get( StaticCountersMonitor.INDEXING_COMPLETED ) ).isEqualTo( 1 );
}

private void initData() {
persist( new Book( 1, TITLE_1, AUTHOR_1 ) );
persist( new Book( 2, TITLE_2, AUTHOR_2 ) );
Expand Down Expand Up @@ -178,6 +241,7 @@ public String getAuthor() {
}
}

@SuppressWarnings("removal")
public static class StaticCountersMonitor implements MassIndexingMonitor {

public static StaticCounters.Key ADDED = StaticCounters.createKey();
Expand All @@ -186,6 +250,42 @@ public static class StaticCountersMonitor implements MassIndexingMonitor {
public static StaticCounters.Key TOTAL = StaticCounters.createKey();
public static StaticCounters.Key INDEXING_COMPLETED = StaticCounters.createKey();


private final boolean requiresTotalCount;

public StaticCountersMonitor() {
this( true );
}

public StaticCountersMonitor(boolean requiresTotalCount) {
this.requiresTotalCount = requiresTotalCount;
}

@Override
public MassIndexingTypeGroupMonitor typeGroupMonitor(MassIndexingTypeGroupMonitorCreateContext context) {
if ( requiresTotalCount ) {
return MassIndexingMonitor.super.typeGroupMonitor( context );
}
else {
return new MassIndexingTypeGroupMonitor() {
@Override
public void documentsIndexed(long increment) {
// do nothing
}

@Override
public void indexingStarted(MassIndexingTypeGroupMonitorContext context) {
// do nothing
}

@Override
public void indexingCompleted(MassIndexingTypeGroupMonitorContext context) {
// do nothing
}
};
}
}

@Override
public void documentsAdded(long increment) {
StaticCounters.get().add( ADDED, (int) increment );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ public interface MassIndexer {
* <p>
* With fail-fast option enabled, the mass indexer will request cancelling all internal mass-indexing processes
* right after the first error is reported to the {@link MassIndexingFailureHandler}.
*
* <p>
* Defaults to {@code false}.
* @param failFast Whether to enabled fail fast option for this mass indexer.
*
* @return {@code this} for method chaining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* @author Hardy Ferentschik
* @deprecated move to {@link org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor}.
*/
@SuppressWarnings("removal")
@Deprecated
public interface MassIndexingMonitor extends org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,4 +1041,9 @@ void indexingProgressWithRemainingTime(float estimatePercentileComplete, long do
@LogMessage(level = INFO)
@Message(id = ID_OFFSET + 168, value = "Mass indexing complete in %3$s. Indexed %1$d/%2$d entities.")
void indexingEntitiesCompleted(long indexed, long total, Duration indexingTime);

@LogMessage(level = INFO)
@Message(id = ID_OFFSET + 169,
value = "Mass indexing is going to index approx. %1$d entities (%2$s). Actual number may change once the indexing starts.")
void indexingEntitiesApprox(long count, String types);
}
Loading

0 comments on commit 12b8628

Please sign in to comment.