Skip to content

Commit

Permalink
Merge branch '7.5' into gh-4527-content-search-colon
Browse files Browse the repository at this point in the history
  • Loading branch information
at055612 committed Oct 15, 2024
2 parents 9b16de6 + c66eae7 commit 05aed97
Show file tree
Hide file tree
Showing 8 changed files with 185 additions and 85 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Crown Copyright
* Copyright 2018-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,10 @@ protected void configure() {
.frequencySchedule("1h"));
}


// --------------------------------------------------------------------------------


private static class IndexContent extends RunnableWrapper {

@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@
import stroom.index.lucene980.analyser.AnalyzerFactory;
import stroom.security.api.SecurityContext;
import stroom.security.shared.DocumentPermissionNames;
import stroom.task.api.TaskContext;
import stroom.task.api.TaskContextFactory;
import stroom.task.api.TerminateHandlerFactory;
import stroom.util.NullSafe;
import stroom.util.concurrent.UncheckedInterruptedException;
import stroom.util.entityevent.EntityAction;
import stroom.util.entityevent.EntityEvent;
import stroom.util.exception.ThrowingRunnable;
import stroom.util.io.FileUtil;
import stroom.util.io.TempDirProvider;
import stroom.util.logging.LambdaLogger;
Expand All @@ -48,6 +51,7 @@
import stroom.util.shared.ResultPage;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.apache.lucene980.analysis.Analyzer;
import org.apache.lucene980.analysis.LowerCaseFilter;
import org.apache.lucene980.analysis.TokenStream;
Expand Down Expand Up @@ -89,7 +93,10 @@
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

@Singleton // Only want ctor re-index to happen once on boot
public class LuceneContentIndex implements ContentIndex, EntityEvent.Handler {

private static final int MIN_GRAM = 1;
Expand All @@ -113,6 +120,7 @@ public class LuceneContentIndex implements ContentIndex, EntityEvent.Handler {
private final TaskContextFactory taskContextFactory;
private final Analyzer analyzer;
private final ArrayBlockingQueue<EntityEvent> changes = new ArrayBlockingQueue<>(10000);
private final Path docIndexDir;
private final Directory directory;

@Inject
Expand Down Expand Up @@ -141,7 +149,7 @@ public LuceneContentIndex(final TempDirProvider tempDirProvider,
analyzerMap.put(TEXT, AnalyzerFactory.create(AnalyzerType.KEYWORD, true));
analyzer = new PerFieldAnalyzerWrapper(new KeywordAnalyzer(), analyzerMap);

final Path docIndexDir = tempDirProvider.get().resolve("doc-index");
docIndexDir = tempDirProvider.get().resolve("doc-index");
Files.createDirectories(docIndexDir);

final boolean validIndex = isValidIndex(docIndexDir, analyzer);
Expand Down Expand Up @@ -203,23 +211,39 @@ private boolean isValidIndex(final Path dir, final Analyzer analyzer) {

@Override
public void onChange(final EntityEvent event) {
try {
if (event != null && event.getDocRef() != null && event.getDocRef().getType() != null) {
if (indexableMap.containsKey(event.getDocRef().getType())) {
NullSafe.consume(event, EntityEvent::getDocRef, DocRef::getType, type -> {
if (indexableMap.containsKey(type)) {
try {
changes.put(event);
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e);
}
}
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
throw new UncheckedInterruptedException(e);
}
});
}

public void reindex() {
securityContext.asProcessingUser(() -> indexableMap.forEach((type, indexable) -> {
final Set<DocRef> docRefs = indexable.listDocuments();
docRefs.forEach(docRef -> onChange(new EntityEvent(docRef, EntityAction.UPDATE)));
}));
securityContext.asProcessingUser(() -> {
final List<DocRef> docRefs = indexableMap.values()
.stream()
.flatMap(contentIndexable ->
contentIndexable.listDocuments().stream())
.toList();

LOGGER.logDurationIfInfoEnabled(
() -> {
docRefs.stream()
.takeWhile(docRef ->
!Thread.currentThread().isInterrupted())
.map(docRef ->
new EntityEvent(docRef, EntityAction.UPDATE))
.forEach(this::onChange);
},
LogUtil.message("Re-index of {} documents in {}",
docRefs.size(),
docIndexDir.toAbsolutePath().normalize()));
});
}

@Override
Expand Down Expand Up @@ -408,55 +432,98 @@ private synchronized void updateIndex(final List<EntityEvent> list) {
TerminateHandlerFactory.NOOP_FACTORY,
taskContext -> {
final IndexWriterConfig indexWriterConfig = new IndexWriterConfig(analyzer);
try (final IndexWriter writer = new IndexWriter(directory,
indexWriterConfig)) {
list.forEach(event -> {
try {
switch (event.getAction()) {
case CREATE -> {
taskContext.info(() -> "Adding: " +
DocRefUtil.createSimpleDocRefString(event.getDocRef()));
addDoc(writer, event.getDocRef());
try (final IndexWriter writer = new IndexWriter(directory, indexWriterConfig)) {
final int totalCount = list.size();
final AtomicInteger count = new AtomicInteger();
list.stream()
.takeWhile(createTaskTerminatedCheck(taskContext))
.forEach(event -> {
count.incrementAndGet();

try {
switch (event.getAction()) {
case CREATE -> {
setTaskContextInfo(
taskContext, event, "Adding", count, totalCount);
addDoc(taskContext, writer, event.getDocRef());
}
case UPDATE -> {
setTaskContextInfo(
taskContext, event, "Updating", count, totalCount);
deleteDoc(writer, event.getDocRef());
addDoc(taskContext, writer, event.getDocRef());
}
case DELETE -> {
setTaskContextInfo(
taskContext, event, "Deleting", count, totalCount);
deleteDoc(writer, event.getDocRef());
}
}
} catch (final Exception e) {
LOGGER.error(e::getMessage, e);
}
case UPDATE -> {
taskContext.info(() -> "Updating: " +
DocRefUtil.createSimpleDocRefString(event.getDocRef()));
deleteDoc(writer, event.getDocRef());
addDoc(writer, event.getDocRef());
}
case DELETE -> {
taskContext.info(() -> "Deleting: " +
DocRefUtil.createSimpleDocRefString(event.getDocRef()));
deleteDoc(writer, event.getDocRef());
}
}
} catch (final Exception e) {
LOGGER.error(e::getMessage, e);
}
});
});

taskContext.info(() -> "Committing");
writer.commit();
LOGGER.logDurationIfDebugEnabled(
ThrowingRunnable.unchecked(writer::commit),
() -> LogUtil.message("Commit {} event(s)", count));

taskContext.info(() -> "Flushing");
writer.flush();
LOGGER.logDurationIfDebugEnabled(
ThrowingRunnable.unchecked(writer::flush),
() -> LogUtil.message("Flush {} event(s)", count));
} catch (final Exception e) {
LOGGER.error(e::getMessage, e);
}
}).run());
}

private void addDoc(final IndexWriter writer, final DocRef docRef) {
private static Predicate<Object> createTaskTerminatedCheck(final TaskContext taskContext) {
return obj -> {
if (taskContext.isTerminated()) {
LOGGER.info("Task is terminated: '{}'", taskContext);
return false;
} else if (Thread.currentThread().isInterrupted()) {
LOGGER.info("Task thread is interrupted: '{}'", taskContext);
return false;
} else {
return true;
}
};
}

private static void setTaskContextInfo(final TaskContext taskContext,
final EntityEvent event,
final String action,
final AtomicInteger count,
final int totalCount) {
taskContext.info(
() -> LogUtil.message("{}: {} ({} of {})",
action,
DocRefUtil.createSimpleDocRefString(event.getDocRef()),
count,
totalCount),
LOGGER);
}

private void addDoc(final TaskContext taskContext, final IndexWriter writer, final DocRef docRef) {
final ContentIndexable contentIndexable = indexableMap.get(docRef.getType());
if (contentIndexable != null) {
final Map<String, String> dataMap = contentIndexable.getIndexableData(docRef);
if (dataMap != null && !dataMap.isEmpty()) {
dataMap.forEach((extension, data) -> {
try {
index(writer, docRef, extension, data);
} catch (final Exception e) {
LOGGER.error(e::getMessage, e);
}
});
if (NullSafe.hasEntries(dataMap)) {
dataMap.entrySet()
.stream()
.takeWhile(createTaskTerminatedCheck(taskContext))
.forEach(entry -> {
final String extension = entry.getKey();
final String data = entry.getValue();
try {
index(writer, docRef, extension, data);
} catch (final Exception e) {
LOGGER.error(e::getMessage, e);
}
});
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,7 @@
import java.util.function.Predicate;
import java.util.regex.Pattern;

public class CompiledIncludeExcludeFilter implements Predicate<String> {

private final Filter filter;
private final Predicate<String> predicate;

private CompiledIncludeExcludeFilter(final Filter filter,
final Predicate<String> predicate) {
this.filter = filter;
this.predicate = predicate;
}
public class CompiledIncludeExcludeFilter {

public static Optional<Predicate<String>> create(final Filter filter, final Map<String, String> paramMap) {
if (filter == null) {
Expand Down Expand Up @@ -73,14 +64,6 @@ public static Optional<Predicate<String>> create(final Filter filter, final Map<
return optional;
}

@Override
public boolean test(final String value) {
final String v = value == null
? ""
: value;
return predicate.test(v);
}

private static List<Pattern> createPatternList(final String patterns, final Map<String, String> paramMap) {
List<Pattern> patternList = null;
if (patterns != null && !patterns.trim().isEmpty()) {
Expand All @@ -97,9 +80,4 @@ private static List<Pattern> createPatternList(final String patterns, final Map<

return patternList;
}

@Override
public String toString() {
return filter.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import stroom.query.language.functions.Val;
import stroom.query.language.functions.ref.StoredValues;
import stroom.query.language.functions.ref.ValueReferenceIndex;
import stroom.util.NullSafe;

import com.google.common.base.Predicates;

Expand Down Expand Up @@ -100,12 +101,15 @@ public static Predicate<Val[]> create(final ExpressionOperator rowExpression,
for (final UsedColumn usedColumn : usedColumns) {
final Generator generator = usedColumn.generator;
generator.set(values, storedValues);
final String value = generator.eval(storedValues, null).toString();
final Val val = generator.eval(storedValues, null);
final String text = NullSafe.getOrElse(val, Val::toString, "");
// As soon as we fail a predicate test for a column then return false.
if (!usedColumn.columnIncludeExcludePredicate.test(value)) {
if (!usedColumn.columnIncludeExcludePredicate.test(text)) {
return false;
}
usedColumn.mapConsumer.accept(columnNameToValueMap, value);
// TODO : Provide a map of Val to the row predicate as opposed to strings.
// Fix the row expression matcher to deal with Vals.
usedColumn.mapConsumer.accept(columnNameToValueMap, val.toString());
}

// Test the row value map.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017 Crown Copyright
* Copyright 2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -187,11 +187,7 @@ String getName() {


String getInfo() {
final Supplier<String> messageSupplier = this.messageSupplier;
if (messageSupplier != null) {
return messageSupplier.get();
}
return "";
return NullSafe.getOrElse(messageSupplier, Supplier::get, "");
}

synchronized void addChild(final TaskContextImpl taskContext) {
Expand All @@ -211,6 +207,6 @@ Set<TaskContextImpl> getChildren() {

@Override
public String toString() {
return getInfo();
return name + " - " + getInfo();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016 Crown Copyright
* Copyright 2016-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,15 +16,18 @@

package stroom.task.impl;

import stroom.util.NullSafe;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

class TaskThreadInfoUtil {

public static String getInfo(final Collection<TaskContextImpl> taskContexts) {
if (taskContexts == null || taskContexts.size() == 0) {
if (NullSafe.isEmptyCollection(taskContexts)) {
return "";
}

Expand Down Expand Up @@ -54,7 +57,7 @@ public static String getInfo(final Collection<TaskContextImpl> taskContexts) {

private static void addLevel(final StringBuilder sb, final Map<TaskContextImpl, Set<TaskContextImpl>> map,
final Set<TaskContextImpl> list, final String prefix) {
if (list != null && list.size() > 0) {
if (NullSafe.hasItems(list)) {
for (final TaskContextImpl taskContext : list) {
// Indent the message if needed.
sb.append(prefix);
Expand Down
Loading

0 comments on commit 05aed97

Please sign in to comment.