Skip to content

Commit

Permalink
HSEARCH-2945 Update mass indexer monitor configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
marko-bekhta committed Sep 12, 2024
1 parent fc702c6 commit 3eaf77f
Show file tree
Hide file tree
Showing 25 changed files with 640 additions and 220 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public String getAuthor() {
}
}

@SuppressWarnings("removal")
public static class StaticCountersMonitor implements MassIndexingMonitor {

public static StaticCounters.Key ADDED = StaticCounters.createKey();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.integrationtest.mapper.pojo.massindexing;

import static org.assertj.core.api.Fail.fail;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.CompletableFuture;

import org.hibernate.search.engine.backend.work.execution.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy;
import org.hibernate.search.engine.cfg.EngineSettings;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.PersistenceTypeKey;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubEntityLoadingBinder;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubLoadingContext;
import org.hibernate.search.mapper.pojo.loading.mapping.annotation.EntityLoadingBinderRef;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.DocumentId;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.GenericField;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.SearchEntity;
import org.hibernate.search.mapper.pojo.massindexing.DefaultMassIndexingMonitor;
import org.hibernate.search.mapper.pojo.standalone.mapping.SearchMapping;
import org.hibernate.search.mapper.pojo.standalone.massindexing.MassIndexer;
import org.hibernate.search.mapper.pojo.standalone.session.SearchSession;
import org.hibernate.search.util.impl.integrationtest.common.extension.BackendMock;
import org.hibernate.search.util.impl.integrationtest.mapper.pojo.standalone.StandalonePojoMappingSetupHelper;
import org.hibernate.search.util.impl.test.extension.ExpectedLog4jLog;

import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import org.apache.logging.log4j.Level;

class MassIndexingDefaultMonitorIT {

private static final int COUNT = 100;
@RegisterExtension
public final BackendMock backendMock = BackendMock.create();

@RegisterExtension
public final StandalonePojoMappingSetupHelper setupHelper =
StandalonePojoMappingSetupHelper.withBackendMock( MethodHandles.lookup(), backendMock );

@RegisterExtension
public ExpectedLog4jLog logged = ExpectedLog4jLog.create();

private final StubLoadingContext loadingContext = new StubLoadingContext();

private SearchMapping setup(String failureHandler) {
backendMock.expectAnySchema( Book.NAME );

SearchMapping mapping = setupHelper.start()
.expectCustomBeans()
.withPropertyRadical( EngineSettings.BACKGROUND_FAILURE_HANDLER, failureHandler )
.setup( Book.class );

backendMock.verifyExpectationsMet();

initData();

return mapping;
}

@ValueSource(booleans = { true, false })
@ParameterizedTest
void countOnBeforeType(boolean doCounts) {
SearchMapping mapping = setup( null );

logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" );
if ( doCounts ) {
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).once();
}
else {
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).never();
}

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

CompletableFuture<?> indexingFuture = new CompletableFuture<>();
indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) );

// add operations on indexes can follow any random order,
// since they are executed by different threads
BackendMock.DocumentWorkCallListContext expectWorks = backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
);
for ( int i = 0; i < COUNT; i++ ) {
final String id = Integer.toString( i );
expectWorks
.add( id, b -> b
.field( "title", "TITLE_" + id )
.field( "author", "AUTHOR_" + id )
);
}

// purgeAtStart and mergeSegmentsAfterPurge are enabled by default,
// so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order:
backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() )
.purge()
.mergeSegments()
.flush()
.refresh();

try {
indexer.monitor( DefaultMassIndexingMonitor.builder().countOnBeforeType( doCounts ).build() )
.startAndWait();
}
catch (InterruptedException e) {
fail( "Unexpected InterruptedException: " + e.getMessage() );
}
}

backendMock.verifyExpectationsMet();
}

@ValueSource(booleans = { true, false })
@ParameterizedTest
void countOnStart(boolean doCounts) {
SearchMapping mapping = setup( null );

logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" );
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).once();
if ( doCounts ) {
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.once();
}
else {
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.never();
}

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

CompletableFuture<?> indexingFuture = new CompletableFuture<>();
indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) );

// add operations on indexes can follow any random order,
// since they are executed by different threads
BackendMock.DocumentWorkCallListContext expectWorks = backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
);
for ( int i = 0; i < COUNT; i++ ) {
final String id = Integer.toString( i );
expectWorks
.add( id, b -> b
.field( "title", "TITLE_" + id )
.field( "author", "AUTHOR_" + id )
);
}

// purgeAtStart and mergeSegmentsAfterPurge are enabled by default,
// so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order:
backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() )
.purge()
.mergeSegments()
.flush()
.refresh();

try {
indexer.monitor( DefaultMassIndexingMonitor.builder().countOnStart( doCounts ).build() )
.startAndWait();
}
catch (InterruptedException e) {
fail( "Unexpected InterruptedException: " + e.getMessage() );
}
}

backendMock.verifyExpectationsMet();
}

private void initData() {
for ( int i = 0; i < COUNT; i++ ) {
persist( new Book( i, "TITLE_" + i, "AUTHOR_" + i ) );
}
}

private void persist(Book book) {
loadingContext.persistenceMap( Book.PERSISTENCE_KEY ).put( book.id, book );
}

@SearchEntity(name = Book.NAME,
loadingBinder = @EntityLoadingBinderRef(type = StubEntityLoadingBinder.class))
@Indexed(index = Book.NAME)
public static class Book {

public static final String NAME = "Book";
public static final PersistenceTypeKey<Book, Integer> PERSISTENCE_KEY =
new PersistenceTypeKey<>( Book.class, Integer.class );

@DocumentId
private Integer id;

@GenericField
private String title;

@GenericField
private String author;

public Book() {
}

public Book(Integer id, String title, String author) {
this.id = id;
this.title = title;
this.author = author;
}

public Integer getId() {
return id;
}

public String getTitle() {
return title;
}

public String getAuthor() {
return author;
}
}

private static class SimulatedFailure extends RuntimeException {
SimulatedFailure(String message) {
super( message );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.SearchEntity;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorContext;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorCreateContext;
import org.hibernate.search.mapper.pojo.standalone.mapping.SearchMapping;
import org.hibernate.search.mapper.pojo.standalone.massindexing.MassIndexer;
import org.hibernate.search.mapper.pojo.standalone.session.SearchSession;
Expand Down Expand Up @@ -83,8 +86,8 @@ void simple() {
// add operations on indexes can follow any random order,
// since they are executed by different threads
backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
.add( "1", b -> b
.field( "title", TITLE_1 )
.field( "author", AUTHOR_1 )
Expand Down Expand Up @@ -134,7 +137,6 @@ void skipTotalCount() {

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
.countEntitiesToIndexOnStart( false )
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

Expand All @@ -144,8 +146,8 @@ void skipTotalCount() {
// add operations on indexes can follow any random order,
// since they are executed by different threads
backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
.add( "1", b -> b
.field( "title", TITLE_1 )
.field( "author", AUTHOR_1 )
Expand All @@ -169,7 +171,7 @@ void skipTotalCount() {
.refresh();

try {
indexer.monitor( new StaticCountersMonitor() )
indexer.monitor( new StaticCountersMonitor( false ) )
.startAndWait();
}
catch (SearchException ignored) {
Expand Down Expand Up @@ -239,6 +241,7 @@ public String getAuthor() {
}
}

@SuppressWarnings("removal")
public static class StaticCountersMonitor implements MassIndexingMonitor {

public static StaticCounters.Key ADDED = StaticCounters.createKey();
Expand All @@ -247,6 +250,42 @@ public static class StaticCountersMonitor implements MassIndexingMonitor {
public static StaticCounters.Key TOTAL = StaticCounters.createKey();
public static StaticCounters.Key INDEXING_COMPLETED = StaticCounters.createKey();


private final boolean requiresTotalCount;

public StaticCountersMonitor() {
this( true );
}

public StaticCountersMonitor(boolean requiresTotalCount) {
this.requiresTotalCount = requiresTotalCount;
}

@Override
public MassIndexingTypeGroupMonitor typeGroupMonitor(MassIndexingTypeGroupMonitorCreateContext context) {
if ( requiresTotalCount ) {
return MassIndexingMonitor.super.typeGroupMonitor( context );
}
else {
return new MassIndexingTypeGroupMonitor() {
@Override
public void documentsIndexed(long increment) {
// do nothing
}

@Override
public void indexingStarted(MassIndexingTypeGroupMonitorContext context) {
// do nothing
}

@Override
public void indexingCompleted(MassIndexingTypeGroupMonitorContext context) {
// do nothing
}
};
}
}

@Override
public void documentsAdded(long increment) {
StaticCounters.get().add( ADDED, (int) increment );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@
*/
package org.hibernate.search.mapper.orm.massindexing;

import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import org.hibernate.CacheMode;
import org.hibernate.search.mapper.pojo.loading.spi.PojoMassIdentifierLoader;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingEnvironment;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingFailureHandler;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor;
import org.hibernate.search.mapper.pojo.massindexing.spi.PojoMassIndexer;
import org.hibernate.search.util.common.annotation.Incubating;

/**
Expand Down Expand Up @@ -240,20 +237,4 @@ public interface MassIndexer {
@Incubating
MassIndexer failFast(boolean failFast);

/**
* Allows specifying whether to try determining the total number of entities of the particular type to index
* and passing that information to the monitor, or skipping the counting and passing an {@link OptionalLong#empty() empty optional} instead.
* as a count.
* <p>
* It may be helpful to skip the counting of entities and start the ID fetching right away to save some time.
* <p>
* Defaults to {@code true}.
* @param countEntitiesToIndexOnStart If {@code true}, the mass indexer will try determining the total number of entities,
* otherwise the mass indexer will not try obtaining the total count.
*
* @return {@code this} for method chaining
*/
@Incubating
MassIndexer countEntitiesToIndexOnStart(boolean countEntitiesToIndexOnStart);

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
* @author Hardy Ferentschik
* @deprecated move to {@link org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor}.
*/
@SuppressWarnings("removal")
@Deprecated
public interface MassIndexingMonitor extends org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor {

Expand Down
Loading

0 comments on commit 3eaf77f

Please sign in to comment.