Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OAK-11139 Allow downloading only recently changed nodes from MongoDB #1734

Open
wants to merge 5 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,21 @@ public class IndexerSupport {
private File indexDefinitions;
private final String checkpoint;
private File existingDataDumpDir;
private long minModified;

public IndexerSupport(IndexHelper indexHelper, String checkpoint) {
this.indexHelper = indexHelper;
this.checkpoint = checkpoint;
}

public long getMinModified() {
return minModified;
}

public void setMinModified(long minModified) {
this.minModified = minModified;
}

public IndexerSupport withExistingDataDumpDir(File existingDataDumpDir) {
this.existingDataDumpDir = existingDataDumpDir;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ private List<IndexStore> buildFlatFileStoreList(NodeState checkpointedState,
.withNodeStateEntryTraverserFactory(new MongoNodeStateEntryTraverserFactory(rootDocumentState.getRootRevision(),
nodeStore, getMongoDocumentStore(), traversalLog))
.withCheckpoint(indexerSupport.getCheckpoint())
.withMinModified(indexerSupport.getMinModified())
.withStatisticsProvider(indexHelper.getStatisticsProvider())
.withIndexingReporter(reporter)
.withAheadOfTimeBlobDownloader(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public class FlatFileNodeStoreBuilder {
private MongoDatabase mongoDatabase = null;
private Set<IndexDefinition> indexDefinitions = null;
private String checkpoint;
private long minModified;
private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP;
private IndexingReporter indexingReporter = IndexingReporter.NOOP;
private MongoClientURI mongoClientURI;
Expand Down Expand Up @@ -191,6 +192,11 @@ public FlatFileNodeStoreBuilder withCheckpoint(String checkpoint) {
return this;
}

public FlatFileNodeStoreBuilder withMinModified(long minModified) {
this.minModified = minModified;
return this;
}

public FlatFileNodeStoreBuilder withMongoClientURI(MongoClientURI mongoClientURI) {
this.mongoClientURI = mongoClientURI;
return this;
Expand Down Expand Up @@ -408,7 +414,7 @@ IndexStoreSortStrategy createSortStrategy(File dir) {
indexingReporter.setIndexNames(indexNames);
return new PipelinedTreeStoreStrategy(mongoClientURI, mongoDocumentStore, nodeStore, rootRevision,
preferredPathElements, blobStore, dir, algorithm, pathPredicate, pathFilters, checkpoint,
statisticsProvider, indexingReporter);
minModified, statisticsProvider, indexingReporter);
}
}
throw new IllegalStateException("Not a valid sort strategy value " + sortStrategyType);
Expand All @@ -435,4 +441,5 @@ File createStoreDir() throws IOException {
public File getFlatFileStoreDir() {
return flatFileStoreDir;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ public String toString() {
private static final int MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES = 10;
private static final BsonDocument NATURAL_HINT = BsonDocument.parse("{ $natural: 1 }");
private static final BsonDocument ID_INDEX_HINT = BsonDocument.parse("{ _id: 1 }");
private static final Bson WITH_MODIFIED_FIELD = Filters.gte(NodeDocument.MODIFIED_IN_SECS, 0);

static final String THREAD_NAME_PREFIX = "mongo-dump";

Expand Down Expand Up @@ -228,6 +227,7 @@ public String toString() {
private final Stopwatch downloadStartWatch = Stopwatch.createUnstarted();
private final DownloadStageStatistics downloadStageStatistics = new DownloadStageStatistics();
private Instant lastDelayedEnqueueWarningMessageLoggedTimestamp = Instant.now();
private final long minModified;

public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
MongoDocumentStore docStore,
Expand All @@ -236,8 +236,10 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
BlockingQueue<NodeDocument[]> queue,
List<PathFilter> pathFilters,
StatisticsProvider statisticsProvider,
IndexingReporter reporter) {
this(mongoClientURI, docStore, maxBatchSizeBytes, maxBatchNumberOfDocuments, queue, pathFilters, statisticsProvider, reporter, new ThreadFactoryBuilder().setDaemon(true).build());
IndexingReporter reporter,
ThreadFactory threadFactory) {
this(mongoClientURI, docStore, maxBatchSizeBytes, maxBatchNumberOfDocuments,
queue, pathFilters, statisticsProvider, reporter, threadFactory, 0);
}

public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
Expand All @@ -248,7 +250,8 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
List<PathFilter> pathFilters,
StatisticsProvider statisticsProvider,
IndexingReporter reporter,
ThreadFactory threadFactory) {
ThreadFactory threadFactory,
long minModified) {
this.mongoClientURI = mongoClientURI;
this.docStore = docStore;
this.statisticsProvider = statisticsProvider;
Expand All @@ -258,6 +261,7 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
this.mongoDocQueue = queue;
this.pathFilters = pathFilters;
this.threadFactory = threadFactory;
this.minModified = minModified;

// Default retries for 5 minutes.
this.retryDuringSeconds = ConfigHelper.getSystemPropertyAsInt(
Expand Down Expand Up @@ -328,6 +332,10 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI,
}
}

private Bson getModifiedFieldFilter() {
return Filters.gte(NodeDocument.MODIFIED_IN_SECS, minModified);
}

@Override
public Result call() throws Exception {
String originalName = Thread.currentThread().getName();
Expand Down Expand Up @@ -402,11 +410,10 @@ private void downloadWithNaturalOrdering() throws InterruptedException, TimeoutE
// inefficient. So we pass the natural hint to force MongoDB to use natural ordering, that is, column scan
MongoFilterPaths mongoFilterPaths = getPathsForRegexFiltering();
Bson mongoFilter = MongoDownloaderRegexUtils.computeMongoQueryFilter(mongoFilterPaths, customExcludeEntriesRegex);

if (mongoFilter == null) {
LOG.info("Downloading full repository from Mongo with natural order");
FindIterable<NodeDocument> mongoIterable = dbCollection
.find(WITH_MODIFIED_FIELD) // Download only documents that have _modified set
.find(getModifiedFieldFilter()) // Download only documents that have _modified set
.hint(NATURAL_HINT);
DownloadTask downloadTask = new DownloadTask(DownloadOrder.UNDEFINED, downloadStageStatistics);
downloadTask.download(mongoIterable);
Expand All @@ -419,7 +426,7 @@ private void downloadWithNaturalOrdering() throws InterruptedException, TimeoutE
DownloadTask downloadTask = new DownloadTask(DownloadOrder.UNDEFINED, downloadStageStatistics);
LOG.info("Downloading from Mongo with natural order using filter: {}", mongoFilter);
FindIterable<NodeDocument> findIterable = dbCollection
.find(Filters.and(WITH_MODIFIED_FIELD, mongoFilter))
.find(Filters.and(getModifiedFieldFilter(), mongoFilter))
.hint(NATURAL_HINT);
downloadTask.download(findIterable);
downloadTask.reportFinalResults();
Expand All @@ -433,6 +440,7 @@ private void downloadWithRetryOnConnectionErrors() throws InterruptedException,
// matched by the regex used in the Mongo query, which assumes a prefix of "???:/content/dam"
MongoFilterPaths mongoFilterPaths = getPathsForRegexFiltering();
Bson mongoFilter = MongoDownloaderRegexUtils.computeMongoQueryFilter(mongoFilterPaths, customExcludeEntriesRegex);
mongoFilter = addMinModifiedToMongoFilter(mongoFilter);
if (mongoFilter == null) {
LOG.info("Downloading full repository");
} else {
Expand Down Expand Up @@ -564,6 +572,28 @@ private void downloadWithRetryOnConnectionErrors() throws InterruptedException,
}
}

/**
* If minModified is set, add this condition to the MongoDB filter.
* If minModified is not set, the old filter is returned.
* This method accepts null, and may return null.
*
* @param mongoFilter the previous filter (may be null)
* @return the combined filter (may be null)
*/
private Bson addMinModifiedToMongoFilter(Bson mongoFilter) {
if (minModified == 0) {
// the is no minModified condition: return the unchanged filter
return mongoFilter;
}
Bson minModifiedFilter = getModifiedFieldFilter();
if (mongoFilter == null) {
// there is no previous filter: return the minModified filter
return minModifiedFilter;
}
// combine both filters
return Filters.and(mongoFilter, minModifiedFilter);
}

private Future<?> submitDownloadTask(ExecutorCompletionService<Void> executor, DownloadTask downloadTask, Bson mongoFilter, String name) {
return executor.submit(() -> {
String originalName = Thread.currentThread().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class PipelinedTreeStoreStrategy extends IndexStoreSortStrategyBase {
public static final String OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = "oak.indexer.pipelined.transformThreads";
public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = 2;
public static final String OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = "oak.indexer.pipelined.workingMemoryMB";
public static final String OAK_INDEXER_PIPELINED_TREE_MIN_MODIFIED = "oak.indexer.pipelined.tree.minModified";
// 0 means autodetect
public static final int DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = 0;
// Between 1 and 100
Expand Down Expand Up @@ -179,6 +180,7 @@ private static void prettyPrintTransformStatisticsHistograms(TransformStageStati
private final int mongoDocBatchMaxNumberOfDocuments;
private final int nseBuffersCount;
private final int nseBuffersSizeBytes;
private final long minModified;

private long nodeStateEntriesExtracted;

Expand All @@ -201,9 +203,11 @@ public PipelinedTreeStoreStrategy(MongoClientURI mongoClientURI,
Predicate<String> pathPredicate,
List<PathFilter> pathFilters,
String checkpoint,
long minModified,
StatisticsProvider statisticsProvider,
IndexingReporter indexingReporter) {
super(storeDir, algorithm, pathPredicate, preferredPathElements, checkpoint);
this.minModified = minModified;
this.mongoClientURI = mongoClientURI;
this.docStore = documentStore;
this.documentNodeStore = documentNodeStore;
Expand Down Expand Up @@ -361,7 +365,7 @@ public File createSortedStoreFile() throws IOException {
emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(nseBuffersSizeBytes, Integer.MAX_VALUE));
}

INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:START] Starting to build TreeStore");
INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:START] Starting to build TreeStore with minModified {}", minModified);
Stopwatch start = Stopwatch.createStarted();

@SuppressWarnings("unchecked")
Expand All @@ -373,7 +377,9 @@ public File createSortedStoreFile() throws IOException {
mongoDocQueue,
pathFilters,
statisticsProvider,
indexingReporter
indexingReporter,
new ThreadFactoryBuilder().setDaemon(true).build(),
minModified
));

ArrayList<Future<PipelinedTransformTask.Result>> transformFutures = new ArrayList<>(numberOfTransformThreads);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,17 @@
package org.apache.jackrabbit.oak.index.indexer.document.incrementalstore;

public enum IncrementalStoreOperand {
// entries are order alphabetically
// add a new node
ADD("A"),
// delete an existing node
DELETE("D"),
// modify an existing node
MODIFY("M"),
DELETE("D");
// remove a node that may (or may not) exist
REMOVE("R"),
// add or update a new or existing node
UPSERT("U");
private final String operand;

IncrementalStoreOperand(String operand) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ private void mergeIndexStoreFiles() throws IOException {
String baseFFSLine = baseFFSBufferedReader.readLine();
String incrementalFFSLine = incrementalFFSBufferedReader.readLine();


int compared;
while (baseFFSLine != null || incrementalFFSLine != null) {
if (baseFFSLine != null && incrementalFFSLine != null) {
Expand Down Expand Up @@ -190,7 +189,7 @@ private String processIncrementalFFSLine(Map<String, IncrementalStoreOperand> en
incrementalFFSLine = incrementalFFSBufferedReader.readLine();
break;
default:
log.error("Wrong operand in incremental ffs: operand:{}, line:{}", operand, incrementalFFSLine);
log.error("Unsupported operand in incremental ffs: operand:{}, line:{}", operand, incrementalFFSLine);
throw new RuntimeException("wrong operand in incremental ffs: operand:" + operand + ", line:" + incrementalFFSLine);
}
return incrementalFFSLine;
Expand Down
Loading
Loading