diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java index 18173621f00..a588ca50293 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/IndexerSupport.java @@ -76,12 +76,21 @@ public class IndexerSupport { private File indexDefinitions; private final String checkpoint; private File existingDataDumpDir; + private long minModified; public IndexerSupport(IndexHelper indexHelper, String checkpoint) { this.indexHelper = indexHelper; this.checkpoint = checkpoint; } + public long getMinModified() { + return minModified; + } + + public void setMinModified(long minModified) { + this.minModified = minModified; + } + public IndexerSupport withExistingDataDumpDir(File existingDataDumpDir) { this.existingDataDumpDir = existingDataDumpDir; return this; diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java index 1a25acc7c1d..1f298a806ad 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/DocumentStoreIndexerBase.java @@ -202,6 +202,7 @@ private List buildFlatFileStoreList(NodeState checkpointedState, .withNodeStateEntryTraverserFactory(new MongoNodeStateEntryTraverserFactory(rootDocumentState.getRootRevision(), nodeStore, getMongoDocumentStore(), traversalLog)) .withCheckpoint(indexerSupport.getCheckpoint()) + .withMinModified(indexerSupport.getMinModified()) .withStatisticsProvider(indexHelper.getStatisticsProvider()) .withIndexingReporter(reporter) .withAheadOfTimeBlobDownloader(true); diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java index 38793e61a49..448970c9bd2 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/FlatFileNodeStoreBuilder.java @@ -110,6 +110,7 @@ public class FlatFileNodeStoreBuilder { private MongoDatabase mongoDatabase = null; private Set indexDefinitions = null; private String checkpoint; + private long minModified; private StatisticsProvider statisticsProvider = StatisticsProvider.NOOP; private IndexingReporter indexingReporter = IndexingReporter.NOOP; private MongoClientURI mongoClientURI; @@ -191,6 +192,11 @@ public FlatFileNodeStoreBuilder withCheckpoint(String checkpoint) { return this; } + public FlatFileNodeStoreBuilder withMinModified(long minModified) { + this.minModified = minModified; + return this; + } + public FlatFileNodeStoreBuilder withMongoClientURI(MongoClientURI mongoClientURI) { this.mongoClientURI = mongoClientURI; return this; @@ -408,7 +414,7 @@ IndexStoreSortStrategy createSortStrategy(File dir) { indexingReporter.setIndexNames(indexNames); return new PipelinedTreeStoreStrategy(mongoClientURI, mongoDocumentStore, nodeStore, rootRevision, preferredPathElements, blobStore, dir, algorithm, pathPredicate, pathFilters, checkpoint, - statisticsProvider, indexingReporter); + minModified, statisticsProvider, indexingReporter); } } throw new IllegalStateException("Not a valid sort strategy value " + sortStrategyType); @@ -435,4 +441,5 @@ File createStoreDir() throws IOException { public File getFlatFileStoreDir() { return flatFileStoreDir; } + } diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java index 2c7c78cd75c..5ede5d0d3da 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedMongoDownloadTask.java @@ -199,7 +199,6 @@ public String toString() { private static final int MIN_INTERVAL_BETWEEN_DELAYED_ENQUEUING_MESSAGES = 10; private static final BsonDocument NATURAL_HINT = BsonDocument.parse("{ $natural: 1 }"); private static final BsonDocument ID_INDEX_HINT = BsonDocument.parse("{ _id: 1 }"); - private static final Bson WITH_MODIFIED_FIELD = Filters.gte(NodeDocument.MODIFIED_IN_SECS, 0); static final String THREAD_NAME_PREFIX = "mongo-dump"; @@ -228,6 +227,7 @@ public String toString() { private final Stopwatch downloadStartWatch = Stopwatch.createUnstarted(); private final DownloadStageStatistics downloadStageStatistics = new DownloadStageStatistics(); private Instant lastDelayedEnqueueWarningMessageLoggedTimestamp = Instant.now(); + private final long minModified; public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI, MongoDocumentStore docStore, @@ -236,8 +236,10 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI, BlockingQueue queue, List pathFilters, StatisticsProvider statisticsProvider, - IndexingReporter reporter) { - this(mongoClientURI, docStore, maxBatchSizeBytes, maxBatchNumberOfDocuments, queue, pathFilters, statisticsProvider, reporter, new ThreadFactoryBuilder().setDaemon(true).build()); + IndexingReporter reporter, + ThreadFactory threadFactory) { + this(mongoClientURI, docStore, maxBatchSizeBytes, maxBatchNumberOfDocuments, + queue, pathFilters, statisticsProvider, reporter, threadFactory, 0); } public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI, @@ -248,7 +250,8 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI, List pathFilters, StatisticsProvider statisticsProvider, IndexingReporter reporter, - ThreadFactory threadFactory) { + ThreadFactory threadFactory, + long minModified) { this.mongoClientURI = mongoClientURI; this.docStore = docStore; this.statisticsProvider = statisticsProvider; @@ -258,6 +261,7 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI, this.mongoDocQueue = queue; this.pathFilters = pathFilters; this.threadFactory = threadFactory; + this.minModified = minModified; // Default retries for 5 minutes. this.retryDuringSeconds = ConfigHelper.getSystemPropertyAsInt( @@ -328,6 +332,10 @@ public PipelinedMongoDownloadTask(MongoClientURI mongoClientURI, } } + private Bson getModifiedFieldFilter() { + return Filters.gte(NodeDocument.MODIFIED_IN_SECS, minModified); + } + @Override public Result call() throws Exception { String originalName = Thread.currentThread().getName(); @@ -402,11 +410,10 @@ private void downloadWithNaturalOrdering() throws InterruptedException, TimeoutE // inefficient. So we pass the natural hint to force MongoDB to use natural ordering, that is, column scan MongoFilterPaths mongoFilterPaths = getPathsForRegexFiltering(); Bson mongoFilter = MongoDownloaderRegexUtils.computeMongoQueryFilter(mongoFilterPaths, customExcludeEntriesRegex); - if (mongoFilter == null) { LOG.info("Downloading full repository from Mongo with natural order"); FindIterable mongoIterable = dbCollection - .find(WITH_MODIFIED_FIELD) // Download only documents that have _modified set + .find(getModifiedFieldFilter()) // Download only documents that have _modified set .hint(NATURAL_HINT); DownloadTask downloadTask = new DownloadTask(DownloadOrder.UNDEFINED, downloadStageStatistics); downloadTask.download(mongoIterable); @@ -419,7 +426,7 @@ private void downloadWithNaturalOrdering() throws InterruptedException, TimeoutE DownloadTask downloadTask = new DownloadTask(DownloadOrder.UNDEFINED, downloadStageStatistics); LOG.info("Downloading from Mongo with natural order using filter: {}", mongoFilter); FindIterable findIterable = dbCollection - .find(Filters.and(WITH_MODIFIED_FIELD, mongoFilter)) + .find(Filters.and(getModifiedFieldFilter(), mongoFilter)) .hint(NATURAL_HINT); downloadTask.download(findIterable); downloadTask.reportFinalResults(); @@ -433,6 +440,7 @@ private void downloadWithRetryOnConnectionErrors() throws InterruptedException, // matched by the regex used in the Mongo query, which assumes a prefix of "???:/content/dam" MongoFilterPaths mongoFilterPaths = getPathsForRegexFiltering(); Bson mongoFilter = MongoDownloaderRegexUtils.computeMongoQueryFilter(mongoFilterPaths, customExcludeEntriesRegex); + mongoFilter = addMinModifiedToMongoFilter(mongoFilter); if (mongoFilter == null) { LOG.info("Downloading full repository"); } else { @@ -564,6 +572,28 @@ private void downloadWithRetryOnConnectionErrors() throws InterruptedException, } } + /** + * If minModified is set, add this condition to the MongoDB filter. + * If minModified is not set, the old filter is returned. + * This method accepts null, and may return null. + * + * @param mongoFilter the previous filter (may be null) + * @return the combined filter (may be null) + */ + private Bson addMinModifiedToMongoFilter(Bson mongoFilter) { + if (minModified == 0) { + // the is no minModified condition: return the unchanged filter + return mongoFilter; + } + Bson minModifiedFilter = getModifiedFieldFilter(); + if (mongoFilter == null) { + // there is no previous filter: return the minModified filter + return minModifiedFilter; + } + // combine both filters + return Filters.and(mongoFilter, minModifiedFilter); + } + private Future submitDownloadTask(ExecutorCompletionService executor, DownloadTask downloadTask, Bson mongoFilter, String name) { return executor.submit(() -> { String originalName = Thread.currentThread().getName(); diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTreeStoreStrategy.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTreeStoreStrategy.java index b5cb0371136..64114caabfa 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTreeStoreStrategy.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTreeStoreStrategy.java @@ -123,6 +123,7 @@ public class PipelinedTreeStoreStrategy extends IndexStoreSortStrategyBase { public static final String OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = "oak.indexer.pipelined.transformThreads"; public static final int DEFAULT_OAK_INDEXER_PIPELINED_TRANSFORM_THREADS = 2; public static final String OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = "oak.indexer.pipelined.workingMemoryMB"; + public static final String OAK_INDEXER_PIPELINED_TREE_MIN_MODIFIED = "oak.indexer.pipelined.tree.minModified"; // 0 means autodetect public static final int DEFAULT_OAK_INDEXER_PIPELINED_WORKING_MEMORY_MB = 0; // Between 1 and 100 @@ -179,6 +180,7 @@ private static void prettyPrintTransformStatisticsHistograms(TransformStageStati private final int mongoDocBatchMaxNumberOfDocuments; private final int nseBuffersCount; private final int nseBuffersSizeBytes; + private final long minModified; private long nodeStateEntriesExtracted; @@ -201,9 +203,11 @@ public PipelinedTreeStoreStrategy(MongoClientURI mongoClientURI, Predicate pathPredicate, List pathFilters, String checkpoint, + long minModified, StatisticsProvider statisticsProvider, IndexingReporter indexingReporter) { super(storeDir, algorithm, pathPredicate, preferredPathElements, checkpoint); + this.minModified = minModified; this.mongoClientURI = mongoClientURI; this.docStore = documentStore; this.documentNodeStore = documentNodeStore; @@ -361,7 +365,7 @@ public File createSortedStoreFile() throws IOException { emptyBatchesQueue.add(NodeStateEntryBatch.createNodeStateEntryBatch(nseBuffersSizeBytes, Integer.MAX_VALUE)); } - INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:START] Starting to build TreeStore"); + INDEXING_PHASE_LOGGER.info("[TASK:PIPELINED-DUMP:START] Starting to build TreeStore with minModified {}", minModified); Stopwatch start = Stopwatch.createStarted(); @SuppressWarnings("unchecked") @@ -373,7 +377,9 @@ public File createSortedStoreFile() throws IOException { mongoDocQueue, pathFilters, statisticsProvider, - indexingReporter + indexingReporter, + new ThreadFactoryBuilder().setDaemon(true).build(), + minModified )); ArrayList> transformFutures = new ArrayList<>(numberOfTransformThreads); diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreOperand.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreOperand.java index 7e629806e6f..2db5aebf66f 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreOperand.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/IncrementalStoreOperand.java @@ -19,9 +19,17 @@ package org.apache.jackrabbit.oak.index.indexer.document.incrementalstore; public enum IncrementalStoreOperand { + // entries are order alphabetically + // add a new node ADD("A"), + // delete an existing node + DELETE("D"), + // modify an existing node MODIFY("M"), - DELETE("D"); + // remove a node that may (or may not) exist + REMOVE("R"), + // add or update a new or existing node + UPSERT("U"); private final String operand; IncrementalStoreOperand(String operand) { diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalFlatFileStore.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalFlatFileStore.java index 485105fa18c..26247c5ae37 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalFlatFileStore.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalFlatFileStore.java @@ -113,7 +113,6 @@ private void mergeIndexStoreFiles() throws IOException { String baseFFSLine = baseFFSBufferedReader.readLine(); String incrementalFFSLine = incrementalFFSBufferedReader.readLine(); - int compared; while (baseFFSLine != null || incrementalFFSLine != null) { if (baseFFSLine != null && incrementalFFSLine != null) { @@ -190,7 +189,7 @@ private String processIncrementalFFSLine(Map en incrementalFFSLine = incrementalFFSBufferedReader.readLine(); break; default: - log.error("Wrong operand in incremental ffs: operand:{}, line:{}", operand, incrementalFFSLine); + log.error("Unsupported operand in incremental ffs: operand:{}, line:{}", operand, incrementalFFSLine); throw new RuntimeException("wrong operand in incremental ffs: operand:" + operand + ", line:" + incrementalFFSLine); } return incrementalFFSLine; diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalTreeStore.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalTreeStore.java index 2e87d49e727..dc39597543f 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalTreeStore.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/incrementalstore/MergeIncrementalTreeStore.java @@ -46,6 +46,7 @@ public class MergeIncrementalTreeStore implements MergeIncrementalStore { + public static final String TOPUP_FILE = "topup.lz4"; private static final String MERGE_BASE_AND_INCREMENTAL_TREE_STORE = "MergeBaseAndIncrementalTreeStore"; private static final ObjectMapper JSON_MAPPER = new ObjectMapper(); private static final Logger LOG = LoggerFactory.getLogger(MergeIncrementalTreeStore.class); @@ -67,16 +68,29 @@ public MergeIncrementalTreeStore(File baseFile, File incrementalFile, File merge @Override public void doMerge() throws IOException { - LOG.info("Merging {} and {}", baseFile.getAbsolutePath(), incrementalFile.getAbsolutePath()); File baseDir = new File(baseFile.getAbsolutePath() + ".files"); - LOG.info("Unpacking to {}", baseDir.getAbsolutePath()); + LOG.info("Unpacking {} to {}", baseFile.getAbsolutePath(), baseDir.getAbsolutePath()); FilePacker.unpack(baseFile, baseDir, true); - File mergedDir = new File(mergedFile.getAbsolutePath() + ".files"); - LOG.info("Merging to {}", mergedDir.getAbsolutePath()); + File topup = new File(incrementalFile.getParent(), TOPUP_FILE); + if (topup.exists()) { + LOG.info("Merging diff {}", incrementalFile.getAbsolutePath()); + updateIndexStore(baseDir, incrementalFile, algorithm); + LOG.info("Merging topup {}", topup.getAbsolutePath()); + updateIndexStore(baseDir, topup, algorithm); + LOG.info("Packing to {}", mergedFile.getAbsolutePath()); + FilePacker.pack(baseDir, TreeSession.getFileNameRegex(), mergedFile, true); + } else { + File mergedDir = new File(mergedFile.getAbsolutePath() + ".files"); + LOG.info("Merging {} and {} to {}", + baseDir.getAbsolutePath(), + incrementalFile.getAbsolutePath(), + mergedDir.getAbsolutePath()); + mergeIndexStore(baseDir, incrementalFile, algorithm, mergedDir); + LOG.info("Packing to {}", mergedFile.getAbsolutePath()); + FilePacker.pack(mergedDir, TreeSession.getFileNameRegex(), mergedFile, true); + } + LOG.info("Merging metadata"); mergeMetadataFiles(); - mergeIndexStore(baseDir, mergedDir); - LOG.info("Packing to {}", mergedFile.getAbsolutePath()); - FilePacker.pack(mergedDir, TreeSession.getFileNameRegex(), mergedFile, true); LOG.info("Completed"); } @@ -85,14 +99,63 @@ public String getStrategyName() { return MERGE_BASE_AND_INCREMENTAL_TREE_STORE; } - /** - * Merges multiple index store files. - * - * This method is a little verbose, but I think this is fine - * as we are not getting consistent data from checkpoint diff - * and we need to handle cases differently. - */ - private void mergeIndexStore(File baseDir, File mergedDir) throws IOException { + private static void updateIndexStore(File treeStoreDir, File incrementalFile, Compression algorithm) throws IOException { + TreeStore treeStore = new TreeStore("treeStore", treeStoreDir, new NodeStateEntryReader(null), 10); + long added = 0, modified = 0, upserted = 0, deleted = 0, removed = 0; + try (BufferedReader incrementalReader = IndexStoreUtils.createReader(incrementalFile, algorithm)) { + while (true) { + StoreEntry line = StoreEntry.readFromReader(incrementalReader); + if (line == null) { + break; + } + String old = treeStore.getSession().get(line.path); + switch (line.operation) { + case ADD: + added++; + if (old != null) { + LOG.warn( + "ADD: node {} already exists with {}; updating with {}", + line.path, old, line.value); + } + treeStore.putNode(line.path, line.value); + break; + case MODIFY: + modified++; + if (old == null) { + LOG.warn( + "MODIFY: node {} doesn't exist yet; updating with {}", + line.path, line.value); + } + treeStore.putNode(line.path, line.value); + break; + case UPSERT: + upserted++; + // upsert = insert or update + treeStore.putNode(line.path, line.value); + break; + case DELETE: + deleted++; + if (old == null) { + LOG.warn( + "DELETE: node {} doesn't exist", + line.path); + } + treeStore.removeNode(line.path); + break; + case REMOVE: + removed++; + // ignore if already removed + treeStore.removeNode(line.path); + break; + } + } + } + LOG.info("Merging completed; added {}, modified {}, upserted {}, deleted {}, removed {}", + added, modified, upserted, deleted, removed); + treeStore.close(); + } + + private static void mergeIndexStore(File baseDir, File incrementalFile, Compression algorithm, File mergedDir) throws IOException { TreeStore baseStore = new TreeStore("base", baseDir, new NodeStateEntryReader(null), 10); TreeStore mergedStore = new TreeStore("merged", mergedDir, new NodeStateEntryReader(null), 10); mergedStore.getSession().init(); @@ -107,7 +170,12 @@ private void mergeIndexStore(File baseDir, File mergedDir) throws IOException { StoreEntry write; if (base == null) { // base EOF: we expect ADD - if (increment.operation != IncrementalStoreOperand.ADD) { + switch (increment.operation) { + case ADD: + case UPSERT: + // ok + break; + default: LOG.warn( "Expected ADD but got {} for incremental path {} value {}. " + "Merging will proceed, but this is unexpected.", @@ -151,8 +219,10 @@ private void mergeIndexStore(File baseDir, File mergedDir) throws IOException { increment.operation, increment.path, increment.value); break; case MODIFY: + case UPSERT: break; case DELETE: + case REMOVE: write = null; } } @@ -173,36 +243,36 @@ private void mergeIndexStore(File baseDir, File mergedDir) throws IOException { mergedStore.close(); } - static class StoreEntry { - final String path; - final String value; - final IncrementalStoreOperand operation; + static class StoreEntry { + final String path; + final String value; + final IncrementalStoreOperand operation; - StoreEntry(String path, String value, IncrementalStoreOperand operation) { - this.path = path; - this.value = value; - this.operation = operation; - } + StoreEntry(String path, String value, IncrementalStoreOperand operation) { + this.path = path; + this.value = value; + this.operation = operation; + } - static StoreEntry readFromTreeStore(Iterator> it) { - while (it.hasNext()) { - Map.Entry e = it.next(); - if (!e.getValue().isEmpty()) { - return new StoreEntry(e.getKey(), e.getValue(), null); - } + static StoreEntry readFromTreeStore(Iterator> it) { + while (it.hasNext()) { + Map.Entry e = it.next(); + if (!e.getValue().isEmpty()) { + return new StoreEntry(e.getKey(), e.getValue(), null); } - return null; } + return null; + } - static StoreEntry readFromReader(BufferedReader reader) throws IOException { - String line = reader.readLine(); - if (line == null) { - return null; - } - String[] parts = IncrementalFlatFileStoreNodeStateEntryWriter.getParts(line); - return new StoreEntry(parts[0], parts[1], OPERATION_MAP.get(parts[3])); + static StoreEntry readFromReader(BufferedReader reader) throws IOException { + String line = reader.readLine(); + if (line == null) { + return null; } + String[] parts = IncrementalFlatFileStoreNodeStateEntryWriter.getParts(line); + return new StoreEntry(parts[0], parts[1], OPERATION_MAP.get(parts[3])); } + } private IndexStoreMetadata getIndexStoreMetadataForMergedFile() throws IOException { diff --git a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/TreeStore.java b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/TreeStore.java index 6854d37f159..cca03a67748 100644 --- a/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/TreeStore.java +++ b/oak-run-commons/src/main/java/org/apache/jackrabbit/oak/index/indexer/document/tree/TreeStore.java @@ -368,10 +368,25 @@ public static String toChildNodeEntry(String parentPath, String childName) { return parentPath + "\t" + childName; } + public void removeNode(String path) { + if (readOnly) { + throw new IllegalStateException("Read only store"); + } + session.put(path, null); + if (!path.equals("/")) { + String nodeName = PathUtils.getName(path); + String parentPath = PathUtils.getParentPath(path); + session.put(parentPath + "\t" + nodeName, null); + } + } + public void putNode(String path, String json) { if (readOnly) { throw new IllegalStateException("Read only store"); } + if (json == null) { + throw new IllegalStateException("Value is null"); + } session.put(path, json); if (!path.equals("/")) { String nodeName = PathUtils.getName(path); diff --git a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTreeStoreIT.java b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTreeStoreIT.java index 3318cc77c28..7a37186b333 100644 --- a/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTreeStoreIT.java +++ b/oak-run-commons/src/test/java/org/apache/jackrabbit/oak/index/indexer/document/flatfile/pipelined/PipelinedTreeStoreIT.java @@ -675,6 +675,7 @@ private PipelinedTreeStoreStrategy createStrategy(MongoTestBackend backend, Pred pathPredicate, mongoRegexPathFilter, null, + 0, statsProvider, indexingReporter); }