diff --git a/versioned/storage/common-tests/src/main/java/org/projectnessie/versioned/storage/commontests/AbstractReferenceLogicTests.java b/versioned/storage/common-tests/src/main/java/org/projectnessie/versioned/storage/commontests/AbstractReferenceLogicTests.java index d83252dbdd6..11472365fc7 100644 --- a/versioned/storage/common-tests/src/main/java/org/projectnessie/versioned/storage/commontests/AbstractReferenceLogicTests.java +++ b/versioned/storage/common-tests/src/main/java/org/projectnessie/versioned/storage/commontests/AbstractReferenceLogicTests.java @@ -31,14 +31,19 @@ import static org.mockito.Mockito.spy; import static org.projectnessie.nessie.relocated.protobuf.ByteString.copyFromUtf8; import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_COMMIT_TIMEOUT_MILLIS; +import static org.projectnessie.versioned.storage.common.indexes.StoreKey.key; import static org.projectnessie.versioned.storage.common.logic.CommitLogQuery.commitLogQuery; +import static org.projectnessie.versioned.storage.common.logic.CreateCommit.Add.commitAdd; +import static org.projectnessie.versioned.storage.common.logic.CreateCommit.newCommitBuilder; import static org.projectnessie.versioned.storage.common.logic.InternalRef.REF_REPO; import static org.projectnessie.versioned.storage.common.logic.InternalRef.allInternalRefs; import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.indexesLogic; import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic; import static org.projectnessie.versioned.storage.common.logic.PagingToken.emptyPagingToken; import static org.projectnessie.versioned.storage.common.logic.PagingToken.pagingToken; import static org.projectnessie.versioned.storage.common.logic.ReferencesQuery.referencesQuery; +import static org.projectnessie.versioned.storage.common.objtypes.CommitHeaders.newCommitHeaders; import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID; import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromString; import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId; @@ -49,9 +54,11 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.IntFunction; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -64,15 +71,18 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.ValueSource; import org.projectnessie.versioned.storage.common.exceptions.RefAlreadyExistsException; import org.projectnessie.versioned.storage.common.exceptions.RefConditionFailedException; import org.projectnessie.versioned.storage.common.exceptions.RefNotFoundException; import org.projectnessie.versioned.storage.common.exceptions.RetryTimeoutException; +import org.projectnessie.versioned.storage.common.indexes.StoreIndexElement; import org.projectnessie.versioned.storage.common.logic.InternalRef; import org.projectnessie.versioned.storage.common.logic.PagedResult; import org.projectnessie.versioned.storage.common.logic.PagingToken; import org.projectnessie.versioned.storage.common.logic.ReferenceLogic; +import org.projectnessie.versioned.storage.common.objtypes.CommitObj; import org.projectnessie.versioned.storage.common.objtypes.CommitType; import org.projectnessie.versioned.storage.common.persist.ObjId; import org.projectnessie.versioned.storage.common.persist.Persist; @@ -94,6 +104,111 @@ protected AbstractReferenceLogicTests(Class surroundingTestClass) { this.surroundingTestClass = surroundingTestClass; } + @ParameterizedTest + @CsvSource({ + "0, 1", "1, 1", "5, 1", "50, 1", "5, 3", "50, 10", + }) + public void rewriteCommitLog(int numSourceCommits, int cutoff) throws Exception { + var referenceLogic = referenceLogic(persist); + var commitLogic = commitLogic(persist); + var indexesLogic = indexesLogic(persist); + + var commits = new ArrayList(); + var head = EMPTY_OBJ_ID; + for (int i = 0; i < numSourceCommits; i++) { + var commit = + commitLogic.doCommit( + newCommitBuilder() + .message("commit #" + i) + .parentCommitId(head) + .headers(newCommitHeaders().add("random", UUID.randomUUID().toString()).build()) + .addAdds( + commitAdd(key("key", "num-" + i), 1, EMPTY_OBJ_ID, null, UUID.randomUUID())) + .build(), + List.of()); + commits.add(commit); + soft.assertThat(commit.seq()).isEqualTo(1 + i); + head = commit.id(); + } + + var reference = + referenceLogic.createReference("rewrite-commits-" + numSourceCommits, head, null); + + var expectedCommitSeq = new AtomicInteger(numSourceCommits); + + var notRewritten = + referenceLogic.rewriteCommitLog( + reference, + (num, commit) -> { + soft.assertThat(commit.seq()).isEqualTo(expectedCommitSeq.getAndDecrement()); + soft.assertThat(num - 1).isEqualTo(numSourceCommits - commit.seq()); + soft.assertThat(commit).isEqualTo(commits.get(numSourceCommits - num)); + return false; + }); + + soft.assertThat(notRewritten).isSameAs(reference); + + var rewritten = referenceLogic.rewriteCommitLog(reference, (num, commit) -> num == cutoff); + + if (numSourceCommits == 0 || numSourceCommits == 1) { + // nothing changed + soft.assertThat(rewritten).isSameAs(reference); + } else { + soft.assertThat(rewritten.pointer()).isNotEqualTo(reference.pointer()); + + var newCommits = newArrayList(commitLogic.commitLog(commitLogQuery(rewritten.pointer()))); + soft.assertThat(newCommits).hasSize(cutoff); + + var newHead = commitLogic.fetchCommit(rewritten.pointer()); + + if (cutoff > 1) { + for (int i = 0; i < cutoff - 1; i++) { + var newCommit = newCommits.get(i); + var oldCommit = commits.get(numSourceCommits - i - 1); + + soft.assertThat(newCommit.directParent()).isEqualTo(newCommits.get(i + 1).id()); + soft.assertThat(newCommit.seq()).isEqualTo(cutoff - i); + + var newContents = + newArrayList(indexesLogic.incrementalIndexFromCommit(newCommit)).stream() + .map(StoreIndexElement::content) + .collect(Collectors.toList()); + var oldContents = + newArrayList(indexesLogic.incrementalIndexFromCommit(oldCommit)).stream() + .map(StoreIndexElement::content) + .collect(Collectors.toList()); + + soft.assertThat(newContents).isEqualTo(oldContents); + } + + var numNonIncremetalOps = + newArrayList(indexesLogic.incrementalIndexFromCommit(newHead)).stream() + .filter(c -> c.content().action().currentCommit()) + .count(); + soft.assertThat(numNonIncremetalOps).isEqualTo(1); + soft.assertThat(newHead.seq()).isEqualTo(cutoff); + soft.assertThat(newHead.directParent()).isNotEqualTo(EMPTY_OBJ_ID); + soft.assertThat(newHead.tail()).hasSize(cutoff); + soft.assertThat(newHead.tail().get(cutoff - 1)).isEqualTo(EMPTY_OBJ_ID); + } else { + var numNonIncremetalOps = + newArrayList(indexesLogic.incrementalIndexFromCommit(newHead)).stream() + .filter(c -> c.content().action().currentCommit()) + .count(); + soft.assertThat(numNonIncremetalOps).isEqualTo(numSourceCommits); + soft.assertThat(newHead.seq()).isEqualTo(1); + soft.assertThat(newHead.directParent()).isEqualTo(EMPTY_OBJ_ID); + soft.assertThat(newHead.tail()).containsExactly(EMPTY_OBJ_ID); + } + + soft.assertThat(indexesLogic.buildCompleteIndexOrEmpty(newHead).asKeyList()) + .containsExactlyInAnyOrderElementsOf( + IntStream.range(0, numSourceCommits) + .mapToObj(i -> key("key", "num-" + i)) + .collect(Collectors.toList())); + } + } + @SuppressWarnings("BusyWait") @RepeatedTest(2) public void refCreationDeletionWithConcurrentRefsListing() throws Exception { diff --git a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogic.java b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogic.java index 5fc664765fc..5eefd21afdb 100644 --- a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogic.java +++ b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogic.java @@ -19,10 +19,14 @@ import jakarta.annotation.Nullable; import java.util.Collections; import java.util.List; +import java.util.function.BiPredicate; +import org.projectnessie.versioned.storage.common.exceptions.CommitConflictException; +import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException; import org.projectnessie.versioned.storage.common.exceptions.RefAlreadyExistsException; import org.projectnessie.versioned.storage.common.exceptions.RefConditionFailedException; import org.projectnessie.versioned.storage.common.exceptions.RefNotFoundException; import org.projectnessie.versioned.storage.common.exceptions.RetryTimeoutException; +import org.projectnessie.versioned.storage.common.objtypes.CommitObj; import org.projectnessie.versioned.storage.common.persist.ObjId; import org.projectnessie.versioned.storage.common.persist.Reference; @@ -115,4 +119,26 @@ void deleteReference(@Nonnull String name, @Nonnull ObjId expectedPointer) @Nonnull Reference assignReference(@Nonnull Reference current, @Nonnull ObjId newPointer) throws RefNotFoundException, RefConditionFailedException; + + /** + * Rewrites the commit log of the given reference up to including the first commit for which the + * given predicate returns {@code true}. + * + *

Read the commit log of {@code current} until {@code cutoffPredicate} returns {@code true}. + * All commits that have been read so far, including the one for which the predicate returned + * {@code true}, will be included. The reference will be updated with the "tip/HEAD" of the newest + * written commit. If the predicate never returned {@code true}, no new commits will be written + * and the reference will not be updated. + * + * @param current the commit log of this reference is going to be rewritten + * @param cutoffPredicate predicate receiving the number of the commit (starting at {@code 1}) and + * the {@link CommitObj} + * @return the updated reference or just {@code current}, if no new commits were written + */ + Reference rewriteCommitLog( + @Nonnull Reference current, BiPredicate cutoffPredicate) + throws RefNotFoundException, + RefConditionFailedException, + CommitConflictException, + ObjNotFoundException; } diff --git a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogicImpl.java b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogicImpl.java index acabd9ed1d3..b9a3c92ccae 100644 --- a/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogicImpl.java +++ b/versioned/storage/common/src/main/java/org/projectnessie/versioned/storage/common/logic/ReferenceLogicImpl.java @@ -23,6 +23,7 @@ import static java.util.Collections.emptyList; import static java.util.Objects.requireNonNull; import static org.projectnessie.nessie.relocated.protobuf.ByteString.copyFromUtf8; +import static org.projectnessie.versioned.storage.common.indexes.StoreIndexElement.indexElement; import static org.projectnessie.versioned.storage.common.indexes.StoreKey.key; import static org.projectnessie.versioned.storage.common.logic.CommitConflict.ConflictType.KEY_EXISTS; import static org.projectnessie.versioned.storage.common.logic.CommitRetry.commitRetry; @@ -38,6 +39,7 @@ import static org.projectnessie.versioned.storage.common.logic.ReferenceLogicImpl.CommitReferenceResult.Kind.REF_ROW_EXISTS; import static org.projectnessie.versioned.storage.common.logic.ReferenceLogicImpl.CommitReferenceResult.Kind.REF_ROW_MISSING; import static org.projectnessie.versioned.storage.common.objtypes.CommitHeaders.newCommitHeaders; +import static org.projectnessie.versioned.storage.common.objtypes.CommitOp.commitOp; import static org.projectnessie.versioned.storage.common.objtypes.RefObj.ref; import static org.projectnessie.versioned.storage.common.objtypes.StandardObjType.COMMIT; import static org.projectnessie.versioned.storage.common.objtypes.StandardObjType.REF; @@ -51,11 +53,13 @@ import jakarta.annotation.Nonnull; import jakarta.annotation.Nullable; import java.time.Instant; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.Optional; +import java.util.function.BiPredicate; import java.util.function.Function; import java.util.function.Supplier; import org.projectnessie.nessie.relocated.protobuf.ByteString; @@ -679,6 +683,78 @@ public Reference assignReference(@Nonnull Reference current, @Nonnull ObjId newP return persist.updateReferencePointer(current, newPointer); } + @Override + public Reference rewriteCommitLog( + @Nonnull Reference current, BiPredicate cutoffPredicate) + throws RefNotFoundException, + RefConditionFailedException, + CommitConflictException, + ObjNotFoundException { + var commitLogic = commitLogic(persist); + var indexesLogic = indexesLogic(persist); + + var log = commitLogic.commitLog(CommitLogQuery.commitLogQuery(current.pointer())); + var commits = new ArrayDeque(); + for (int n = 1; log.hasNext(); n++) { + var commit = log.next(); + + if (cutoffPredicate.test(n, commit)) { + // THIS is the last commit to retain + + if (commit.directParent().equals(EMPTY_OBJ_ID)) { + return current; + } + + // Build the "oldest" commit + var c = + newCommitBuilder() + .message(commit.message()) + .parentCommitId(EMPTY_OBJ_ID) + .headers(commit.headers()); + for (StoreIndexElement element : indexesLogic.buildCompleteIndexOrEmpty(commit)) { + var op = element.content(); + if (op.action().exists()) { + c.addAdds(commitAdd(element.key(), op.payload(), op.value(), null, op.contentId())); + } + } + var head = commitLogic.buildCommitObj(c.build()); + commitLogic.storeCommit(head, List.of()); + + // Build the "next newer" commit(s) + while (!commits.isEmpty()) { + commit = commits.removeLast(); + + c = + newCommitBuilder() + .message(commit.message()) + .parentCommitId(head.id()) + .headers(commit.headers()); + for (StoreIndexElement element : + indexesLogic.incrementalIndexFromCommit(commit)) { + var op = element.content(); + if (op.action().currentCommit()) { + if (op.action().exists()) { + c.addAdds(commitAdd(element.key(), op.payload(), op.value(), null, op.contentId())); + } else { + c.addRemoves(commitRemove(element.key(), op.payload(), op.value(), op.contentId())); + } + } + } + head = commitLogic.buildCommitObj(c.build()); + commitLogic.storeCommit(head, List.of()); + } + + return persist.updateReferencePointer(current, head.id()); + } + + // memoize the + commits.addLast(commit); + } + + // no cut-off found, just return "current" + return current; + } + private Reference maybeRecover( @Nonnull String name, Reference ref,