Skip to content

Commit

Permalink
OAK-11139 Allow downloading only recently changed nodes from MongoDB (#…
Browse files Browse the repository at this point in the history
…1734)

* OAK-11139 Allow downloading only recently changed nodes from MongoDB

* OAK-11139 Allow downloading only recently changed nodes from MongoDB

* OAK-11139 Allow downloading only recently changed nodes from MongoDB

* OAK-11139 Allow downloading only recently changed nodes from MongoDB

* OAK-11139 Allow downloading only recently changed nodes from MongoDB

* OAK-11139 Allow downloading only recently changed nodes from MongoDB

* OAK-11139 Allow downloading only recently changed nodes from MongoDB

* OAK-11139 Allow downloading only recently changed nodes from MongoDB
  • Loading branch information
thomasmueller authored Oct 15, 2024
1 parent 0ed73ed commit e80723e
Show file tree
Hide file tree
Showing 12 changed files with 511 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,25 @@ public class IndexerSupport {
private final String checkpoint;
private File existingDataDumpDir;

/**
* The lower bound of the "_modified" property, when using the document node
* store.
*/
private long minModified;

public IndexerSupport(IndexHelper indexHelper, String checkpoint) {
this.indexHelper = indexHelper;
this.checkpoint = checkpoint;
}

public long getMinModified() {
return minModified;
}

public void setMinModified(long minModified) {
this.minModified = minModified;
}

public IndexerSupport withExistingDataDumpDir(File existingDataDumpDir) {
this.existingDataDumpDir = existingDataDumpDir;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ private List<IndexStore> buildFlatFileStoreList(NodeState checkpointedState,
.withNodeStateEntryTraverserFactory(new MongoNodeStateEntryTraverserFactory(rootDocumentState.getRootRevision(),
nodeStore, getMongoDocumentStore(), traversalLog))
.withCheckpoint(indexerSupport.getCheckpoint())
.withMinModified(indexerSupport.getMinModified())
.withStatisticsProvider(indexHelper.getStatisticsProvider())
.withIndexingReporter(reporter)
.withAheadOfTimeBlobDownloader(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class FlatFileNodeStoreBuilder {
private MongoDatabase mongoDatabase = null;
private Set<IndexDefinition> indexDefinitions = null;
private String checkpoint;
private long minModified;
private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
private IndexingReporter indexingReporter = IndexingReporter.NOOP;
private MongoClientURI mongoClientURI;
Expand Down Expand Up @@ -191,6 +192,18 @@ public FlatFileNodeStoreBuilder withCheckpoint(String checkpoint) {
return this;
}

/**
* Use the given lower bound of the "_modified" property, when using the document node
* store.
*
* @param minModified the minimum value of the "_modified" property
* @return this
*/
public FlatFileNodeStoreBuilder withMinModified(long minModified) {
this.minModified = minModified;
return this;
}

public FlatFileNodeStoreBuilder withMongoClientURI(MongoClientURI mongoClientURI) {
this.mongoClientURI = mongoClientURI;
return this;
Expand Down Expand Up @@ -409,7 +422,7 @@ IndexStoreSortStrategy createSortStrategy(File dir) {
indexingReporter.setIndexNames(indexNames);
return new PipelinedTreeStoreStrategy(mongoClientURI, mongoDocumentStore, nodeStore, rootRevision,
preferredPathElements, blobStore, dir, algorithm, pathPredicate, pathFilters, checkpoint,
statisticsProvider, indexingReporter);
minModified, statisticsProvider, indexingReporter);
}
}
throw new IllegalStateException("Not a valid sort strategy value " + sortStrategyType);
Expand All @@ -436,4 +449,5 @@ File createStoreDir() throws IOException {
public File getFlatFileStoreDir() {
return flatFileStoreDir;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ public String toString() {
private static final int MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES = 10;
private static final BsonDocument NATURAL_HINT = BsonDocument.parse("{ $natural: 1 }");
private static final BsonDocument ID_INDEX_HINT = BsonDocument.parse("{ _id: 1 }");
private static final Bson WITH_MODIFIED_FIELD = Filters.gte(NodeDocument.MODIFIED_IN_SECS, 0);

static final String THREAD_NAME_PREFIX = "mongo-dump";

Expand Down Expand Up @@ -228,6 +227,7 @@ public String toString() {
private final Stopwatch downloadStartWatch = Stopwatch.createUnstarted();
private final DownloadStageStatistics downloadStageStatistics = new DownloadStageStatistics();
private Instant lastDelayedEnqueueWarningMessageLoggedTimestamp = Instant.now();
private final long minModified;

public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
MongoDocumentStore docStore,
Expand All @@ -236,8 +236,10 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
BlockingQueue<NodeDocument[]> queue,
List<PathFilter> pathFilters,
StatisticsProvider statisticsProvider,
IndexingReporter reporter) {
this(mongoClientURI, docStore, maxBatchSizeBytes, maxBatchNumberOfDocuments, queue, pathFilters, statisticsProvider, reporter, new ThreadFactoryBuilder().setDaemon(true).build());
IndexingReporter reporter,
ThreadFactory threadFactory) {
this(mongoClientURI, docStore, maxBatchSizeBytes, maxBatchNumberOfDocuments,
queue, pathFilters, statisticsProvider, reporter, threadFactory, 0);
}

public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
Expand All @@ -248,7 +250,8 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
List<PathFilter> pathFilters,
StatisticsProvider statisticsProvider,
IndexingReporter reporter,
ThreadFactory threadFactory) {
ThreadFactory threadFactory,
long minModified) {
this.mongoClientURI = mongoClientURI;
this.docStore = docStore;
this.statisticsProvider = statisticsProvider;
Expand All @@ -258,6 +261,7 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
this.mongoDocQueue = queue;
this.pathFilters = pathFilters;
this.threadFactory = threadFactory;
this.minModified = minModified;

// Default retries for 5 minutes.
this.retryDuringSeconds = ConfigHelper.getSystemPropertyAsInt(
Expand Down Expand Up @@ -328,6 +332,10 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
}
}

private Bson getModifiedFieldFilter() {
return Filters.gte(NodeDocument.MODIFIED_IN_SECS, minModified);
}

@Override
public Result call() throws Exception {
String originalName = Thread.currentThread().getName();
Expand Down Expand Up @@ -402,11 +410,10 @@ private void downloadWithNaturalOrdering() throws InterruptedException, TimeoutE
// inefficient. So we pass the natural hint to force MongoDB to use natural ordering, that is, column scan
MongoFilterPaths mongoFilterPaths = getPathsForRegexFiltering();
Bson mongoFilter = MongoDownloaderRegexUtils.computeMongoQueryFilter(mongoFilterPaths, customExcludeEntriesRegex);

if (mongoFilter == null) {
LOG.info("Downloading full repository from Mongo with natural order");
FindIterable<NodeDocument> mongoIterable = dbCollection
.find(WITH_MODIFIED_FIELD) // Download only documents that have _modified set
.find(getModifiedFieldFilter()) // Download only documents that have _modified set
.hint(NATURAL_HINT);
DownloadTask downloadTask = new DownloadTask(DownloadOrder.UNDEFINED, downloadStageStatistics);
downloadTask.download(mongoIterable);
Expand All @@ -419,7 +426,7 @@ private void downloadWithNaturalOrdering() throws InterruptedException, TimeoutE
DownloadTask downloadTask = new DownloadTask(DownloadOrder.UNDEFINED, downloadStageStatistics);
LOG.info("Downloading from Mongo with natural order using filter: {}", mongoFilter);
FindIterable<NodeDocument> findIterable = dbCollection
.find(Filters.and(WITH_MODIFIED_FIELD, mongoFilter))
.find(Filters.and(getModifiedFieldFilter(), mongoFilter))
.hint(NATURAL_HINT);
downloadTask.download(findIterable);
downloadTask.reportFinalResults();
Expand All @@ -433,6 +440,7 @@ private void downloadWithRetryOnConnectionErrors() throws InterruptedException,
// matched by the regex used in the Mongo query, which assumes a prefix of "???:/content/dam"
MongoFilterPaths mongoFilterPaths = getPathsForRegexFiltering();
Bson mongoFilter = MongoDownloaderRegexUtils.computeMongoQueryFilter(mongoFilterPaths, customExcludeEntriesRegex);
mongoFilter = addMinModifiedToMongoFilter(mongoFilter);
if (mongoFilter == null) {
LOG.info("Downloading full repository");
} else {
Expand Down Expand Up @@ -564,6 +572,28 @@ private void downloadWithRetryOnConnectionErrors() throws InterruptedException,
}
}

/**
* If minModified is set, add this condition to the MongoDB filter.
* If minModified is not set, the old filter is returned.
* This method accepts null, and may return null.
*
* @param mongoFilter the previous filter (may be null)
* @return the combined filter (may be null)
*/
private Bson addMinModifiedToMongoFilter(Bson mongoFilter) {
if (minModified == 0) {
// the is no minModified condition: return the unchanged filter
return mongoFilter;
}
Bson minModifiedFilter = getModifiedFieldFilter();
if (mongoFilter == null) {
// there is no previous filter: return the minModified filter
return minModifiedFilter;
}
// combine both filters
return Filters.and(mongoFilter, minModifiedFilter);
}

private Future<?> submitDownloadTask(ExecutorCompletionService<Void> executor, DownloadTask downloadTask, Bson mongoFilter, String name) {
return executor.submit(() -> {
String originalName = Thread.currentThread().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class PipelinedTreeStoreStrategy extends IndexStoreSortStrategyBase {
public static final String OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = "oak.indexer.pipelined.transformThreads";
public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = 2;
public static final String OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = "oak.indexer.pipelined.workingMemoryMB";
public static final String OAK_INDEXER_PIPELINED_TREE_MIN_MODIFIED = "oak.indexer.pipelined.tree.minModified";
// 0 means autodetect
public static final int DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = 0;
// Between 1 and 100
Expand Down Expand Up @@ -179,6 +180,7 @@ private static void prettyPrintTransformStatisticsHistograms(TransformStageStati
private final int mongoDocBatchMaxNumberOfDocuments;
private final int nseBuffersCount;
private final int nseBuffersSizeBytes;
private final long minModified;

private long nodeStateEntriesExtracted;

Expand All @@ -201,9 +203,11 @@ public PipelinedTreeStoreStrategy(MongoClientURI mongoClientURI,
Predicate<String> pathPredicate,
List<PathFilter> pathFilters,
String checkpoint,
long minModified,
StatisticsProvider statisticsProvider,
IndexingReporter indexingReporter) {
super(storeDir, algorithm, pathPredicate, preferredPathElements, checkpoint);
this.minModified = minModified;
this.mongoClientURI = mongoClientURI;
this.docStore = documentStore;
this.documentNodeStore = documentNodeStore;
Expand Down Expand Up @@ -361,7 +365,7 @@ public File createSortedStoreFile() throws IOException {
emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(nseBuffersSizeBytes, Integer.MAX_VALUE));
}

INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:START] Starting to build TreeStore");
INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:START] Starting to build TreeStore with minModified {}", minModified);
Stopwatch start = Stopwatch.createStarted();

@SuppressWarnings("unchecked")
Expand All @@ -373,7 +377,9 @@ public File createSortedStoreFile() throws IOException {
mongoDocQueue,
pathFilters,
statisticsProvider,
indexingReporter
indexingReporter,
new ThreadFactoryBuilder().setDaemon(true).build(),
minModified
));

ArrayList<Future<PipelinedTransformTask.Result>> transformFutures = new ArrayList<>(numberOfTransformThreads);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,44 @@
*/
package org.apache.jackrabbit.oak.index.indexer.document.incrementalstore;

/**
* The operation to perform. Operations are generated by either running a 'diff'
* , or by 'top up' which is downloading the changes from MongoDB.
*
* When using the 'diff' command, we expect that the node doesn't exist yet for
* the 'add' case, and that the node exists for the 'delete' and 'modify' case
* (due to the nature of the 'diff'). The operations are then ADD, DELETE,
* MODIFY.
*
* When using the 'top up', we don't know whether the node existed before or
* not; we only get the updated state. That's why we use different operations:
* REMOVE_IF_EXISTS and INSERT_OR_UPDATE.
*/
public enum IncrementalStoreOperand {

// Add a new node;
// log a warning if it already exists.
// (This operation is used by the 'diff' command.)
ADD("A"),

// Delete an existing node;
// log a warning if it doesn't exist.
// (This operation is used by the 'diff' command.)
DELETE("D"),

// Modify an existing node;
// log a warning if it doesn't exist.
// (This operation is used by the 'diff' command.)
MODIFY("M"),
DELETE("D");

// Remove a node if it exists.
// (This operation is used by the 'top up' command.)
REMOVE_IF_EXISTS("R"),

// Add a new node or update an existing node.
// (This operation is used by the 'top up' command.)
INSERT_OR_UPDATE("U");

private final String operand;

IncrementalStoreOperand(String operand) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ private void mergeIndexStoreFiles() throws IOException {
String baseFFSLine = baseFFSBufferedReader.readLine();
String incrementalFFSLine = incrementalFFSBufferedReader.readLine();


int compared;
while (baseFFSLine != null || incrementalFFSLine != null) {
if (baseFFSLine != null && incrementalFFSLine != null) {
Expand Down Expand Up @@ -190,7 +189,7 @@ private String processIncrementalFFSLine(Map<String, IncrementalStoreOperand> en
incrementalFFSLine = incrementalFFSBufferedReader.readLine();
break;
default:
log.error("Wrong operand in incremental ffs: operand:{}, line:{}", operand, incrementalFFSLine);
log.error("Unsupported operand in incremental ffs: operand:{}, line:{}", operand, incrementalFFSLine);
throw new RuntimeException("wrong operand in incremental ffs: operand:" + operand + ", line:" + incrementalFFSLine);
}
return incrementalFFSLine;
Expand Down
Loading

0 comments on commit e80723e

Please sign in to comment.