Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HSEARCH-2945 Configure default mass indexing logging monitor #4314

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions documentation/src/main/asciidoc/migration/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ But there are next changes:
This was done to address the scenarios where the total number of identifiers to load is not known ahead of time.
- Deprecated `org.hibernate.search.mapper.orm.massindexing.MassIndexingFailureHandler`, `org.hibernate.search.mapper.orm.massindexing.MassIndexingMonitor`
interfaces are removed in this version. They have their alternatives in a `org.hibernate.search.mapper.pojo.massindexing` for a while now.
- `org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor#addToTotalCount(..)` gets deprecated for removal.
Instead, we are introducing the `org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor`
that can be obtained through `org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor#typeGroupMonitor(..)`.
This new type group monitor has more flexibility and also allows implementors to skip total count computations if needed.

[[spi]]
== SPI
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,9 @@ The default, built-in monitor logs progress periodically at the `INFO` level,
but a custom monitor can be set by implementing the `MassIndexingMonitor` interface
and passing an instance using the `monitor` method.

The built-in monitor's behaviour can be customized through `DefaultMassIndexingMonitor` builder,
e.g. `indexer.monitor( DefaultMassIndexingMonitor.builder().countOnStart( false ).build() ) )`

Implementations of `MassIndexingMonitor` must be thread-safe.

|`failureHandler(MassIndexingFailureHandler)`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public String getAuthor() {
}
}

@SuppressWarnings("removal")
public static class StaticCountersMonitor implements MassIndexingMonitor {

public static StaticCounters.Key ADDED = StaticCounters.createKey();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
/*
* SPDX-License-Identifier: Apache-2.0
* Copyright Red Hat Inc. and Hibernate Authors
*/
package org.hibernate.search.integrationtest.mapper.pojo.massindexing;

import static org.assertj.core.api.Fail.fail;

import java.lang.invoke.MethodHandles;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import org.hibernate.search.engine.backend.work.execution.DocumentCommitStrategy;
import org.hibernate.search.engine.backend.work.execution.DocumentRefreshStrategy;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.PersistenceTypeKey;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubEntityLoadingBinder;
import org.hibernate.search.integrationtest.mapper.pojo.testsupport.loading.StubLoadingContext;
import org.hibernate.search.mapper.pojo.loading.mapping.annotation.EntityLoadingBinderRef;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.DocumentId;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.GenericField;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.SearchEntity;
import org.hibernate.search.mapper.pojo.massindexing.DefaultMassIndexingMonitor;
import org.hibernate.search.mapper.pojo.standalone.mapping.SearchMapping;
import org.hibernate.search.mapper.pojo.standalone.massindexing.MassIndexer;
import org.hibernate.search.mapper.pojo.standalone.session.SearchSession;
import org.hibernate.search.util.impl.integrationtest.common.extension.BackendMock;
import org.hibernate.search.util.impl.integrationtest.mapper.pojo.standalone.StandalonePojoMappingSetupHelper;
import org.hibernate.search.util.impl.test.extension.ExpectedLog4jLog;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import org.apache.logging.log4j.Level;

class MassIndexingDefaultMonitorIT {

private static final int COUNT = 100;
@RegisterExtension
public final BackendMock backendMock = BackendMock.create();

@RegisterExtension
public final StandalonePojoMappingSetupHelper setupHelper =
StandalonePojoMappingSetupHelper.withBackendMock( MethodHandles.lookup(), backendMock );

@RegisterExtension
public ExpectedLog4jLog logged = ExpectedLog4jLog.create();

private final StubLoadingContext loadingContext = new StubLoadingContext();

@ValueSource(booleans = { true, false })
@ParameterizedTest
void countOnBeforeType(boolean doCounts) {
actualTest( () -> {
logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" );
if ( doCounts ) {
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).once();
}
else {
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).never();
}
}, indexer -> indexer.monitor( DefaultMassIndexingMonitor.builder().countOnBeforeType( doCounts ).build() ) );
}

@ValueSource(booleans = { true, false })
@ParameterizedTest
void countOnStart(boolean doCounts) {
actualTest( () -> {
logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" );
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).once();
if ( doCounts ) {
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.once();
}
else {
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.never();
}
}, indexer -> indexer.monitor( DefaultMassIndexingMonitor.builder().countOnStart( doCounts ).build() ) );
}

@Test
void noCountsAtAll() {
actualTest( () -> {
logged.expectEvent( Level.INFO, "Mass indexing complete in ", ". Indexed 100/100 entities" ).once();
logged.expectEvent( Level.INFO, "Mass indexing is going to index 100 entities" ).never();
logged.expectEvent( Level.INFO,
"Mass indexing is going to index approx. 100 entities ([ Book ]). Actual number may change once the indexing starts." )
.never();
}, indexer -> indexer
.monitor( DefaultMassIndexingMonitor.builder().countOnStart( false ).countOnBeforeType( false ).build() ) );
}

private void actualTest(Runnable expectedLogs, Consumer<MassIndexer> massIndexerConfiguration) {
backendMock.expectAnySchema( Book.NAME );

SearchMapping mapping = setupHelper.start()
.expectCustomBeans()
.setup( Book.class );

backendMock.verifyExpectationsMet();

initData();

expectedLogs.run();

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

CompletableFuture<?> indexingFuture = new CompletableFuture<>();
indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) );

// add operations on indexes can follow any random order,
// since they are executed by different threads
BackendMock.DocumentWorkCallListContext expectWorks = backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
);
for ( int i = 0; i < COUNT; i++ ) {
final String id = Integer.toString( i );
expectWorks
.add( id, b -> b
.field( "title", "TITLE_" + id )
.field( "author", "AUTHOR_" + id )
);
}

// purgeAtStart and mergeSegmentsAfterPurge are enabled by default,
// so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order:
backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() )
.purge()
.mergeSegments()
.flush()
.refresh();

try {
massIndexerConfiguration.accept( indexer );
indexer.startAndWait();
}
catch (InterruptedException e) {
fail( "Unexpected InterruptedException: " + e.getMessage() );
}
}

backendMock.verifyExpectationsMet();
}

private void initData() {
for ( int i = 0; i < COUNT; i++ ) {
persist( new Book( i, "TITLE_" + i, "AUTHOR_" + i ) );
}
}

private void persist(Book book) {
loadingContext.persistenceMap( Book.PERSISTENCE_KEY ).put( book.id, book );
}

@SearchEntity(name = Book.NAME,
loadingBinder = @EntityLoadingBinderRef(type = StubEntityLoadingBinder.class))
@Indexed(index = Book.NAME)
public static class Book {

public static final String NAME = "Book";
public static final PersistenceTypeKey<Book, Integer> PERSISTENCE_KEY =
new PersistenceTypeKey<>( Book.class, Integer.class );

@DocumentId
private Integer id;

@GenericField
private String title;

@GenericField
private String author;

public Book() {
}

public Book(Integer id, String title, String author) {
this.id = id;
this.title = title;
this.author = author;
}

public Integer getId() {
return id;
}

public String getTitle() {
return title;
}

public String getAuthor() {
return author;
}
}

private static class SimulatedFailure extends RuntimeException {
SimulatedFailure(String message) {
super( message );
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.Indexed;
import org.hibernate.search.mapper.pojo.mapping.definition.annotation.SearchEntity;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingMonitor;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitor;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorContext;
import org.hibernate.search.mapper.pojo.massindexing.MassIndexingTypeGroupMonitorCreateContext;
import org.hibernate.search.mapper.pojo.standalone.mapping.SearchMapping;
import org.hibernate.search.mapper.pojo.standalone.massindexing.MassIndexer;
import org.hibernate.search.mapper.pojo.standalone.session.SearchSession;
Expand Down Expand Up @@ -128,6 +131,66 @@ void simple() {
assertThat( staticCounters.get( StaticCountersMonitor.INDEXING_COMPLETED ) ).isEqualTo( 1 );
}

@Test
void skipTotalCount() {
SearchMapping mapping = setup( null );

try ( SearchSession searchSession = mapping.createSession() ) {
MassIndexer indexer = searchSession.massIndexer()
// Simulate passing information to connect to a DB, ...
.context( StubLoadingContext.class, loadingContext );

CompletableFuture<?> indexingFuture = new CompletableFuture<>();
indexingFuture.completeExceptionally( new SimulatedFailure( "Indexing error" ) );

// add operations on indexes can follow any random order,
// since they are executed by different threads
backendMock.expectWorks(
Book.NAME, DocumentCommitStrategy.NONE, DocumentRefreshStrategy.NONE
)
.add( "1", b -> b
.field( "title", TITLE_1 )
.field( "author", AUTHOR_1 )
)
.add( "3", b -> b
.field( "title", TITLE_3 )
.field( "author", AUTHOR_3 )
)
.createAndExecuteFollowingWorks( indexingFuture )
.add( "2", b -> b
.field( "title", TITLE_2 )
.field( "author", AUTHOR_2 )
);

// purgeAtStart and mergeSegmentsAfterPurge are enabled by default,
// so we expect 1 purge, 1 mergeSegments and 1 flush calls in this order:
backendMock.expectIndexScaleWorks( Book.NAME, searchSession.tenantIdentifierValue() )
.purge()
.mergeSegments()
.flush()
.refresh();

try {
indexer.monitor( new StaticCountersMonitor( false ) )
.startAndWait();
}
catch (SearchException ignored) {
// Expected, but not relevant to this test
}
catch (InterruptedException e) {
fail( "Unexpected InterruptedException: " + e.getMessage() );
}
}

backendMock.verifyExpectationsMet();

assertThat( staticCounters.get( StaticCountersMonitor.LOADED ) ).isEqualTo( 3 );
assertThat( staticCounters.get( StaticCountersMonitor.BUILT ) ).isEqualTo( 3 );
assertThat( staticCounters.get( StaticCountersMonitor.ADDED ) ).isEqualTo( 2 );
assertThat( staticCounters.get( StaticCountersMonitor.TOTAL ) ).isEqualTo( 0 );
assertThat( staticCounters.get( StaticCountersMonitor.INDEXING_COMPLETED ) ).isEqualTo( 1 );
}

private void initData() {
persist( new Book( 1, TITLE_1, AUTHOR_1 ) );
persist( new Book( 2, TITLE_2, AUTHOR_2 ) );
Expand Down Expand Up @@ -178,6 +241,7 @@ public String getAuthor() {
}
}

@SuppressWarnings("removal")
public static class StaticCountersMonitor implements MassIndexingMonitor {

public static StaticCounters.Key ADDED = StaticCounters.createKey();
Expand All @@ -186,6 +250,42 @@ public static class StaticCountersMonitor implements MassIndexingMonitor {
public static StaticCounters.Key TOTAL = StaticCounters.createKey();
public static StaticCounters.Key INDEXING_COMPLETED = StaticCounters.createKey();


private final boolean requiresTotalCount;

public StaticCountersMonitor() {
this( true );
}

public StaticCountersMonitor(boolean requiresTotalCount) {
this.requiresTotalCount = requiresTotalCount;
}

@Override
public MassIndexingTypeGroupMonitor typeGroupMonitor(MassIndexingTypeGroupMonitorCreateContext context) {
if ( requiresTotalCount ) {
return MassIndexingMonitor.super.typeGroupMonitor( context );
}
else {
return new MassIndexingTypeGroupMonitor() {
@Override
public void documentsIndexed(long increment) {
// do nothing
}

@Override
public void indexingStarted(MassIndexingTypeGroupMonitorContext context) {
// do nothing
}

@Override
public void indexingCompleted(MassIndexingTypeGroupMonitorContext context) {
// do nothing
}
};
}
}

@Override
public void documentsAdded(long increment) {
StaticCounters.get().add( ADDED, (int) increment );
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,8 @@ public interface MassIndexer {
* <p>
* With fail-fast option enabled, the mass indexer will request cancelling all internal mass-indexing processes
* right after the first error is reported to the {@link MassIndexingFailureHandler}.
*
* <p>
* Defaults to {@code false}.
* @param failFast Whether to enabled fail fast option for this mass indexer.
*
* @return {@code this} for method chaining
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,4 +1041,9 @@ void indexingProgressWithRemainingTime(float estimatePercentileComplete, long do
@LogMessage(level = INFO)
@Message(id = ID_OFFSET + 168, value = "Mass indexing complete in %3$s. Indexed %1$d/%2$d entities.")
void indexingEntitiesCompleted(long indexed, long total, Duration indexingTime);

@LogMessage(level = INFO)
@Message(id = ID_OFFSET + 169,
value = "Mass indexing is going to index approx. %1$d entities (%2$s). Actual number may change once the indexing starts.")
void indexingEntitiesApprox(long count, String types);
}
Loading