Skip to content

Commit

Permalink
ReferenceLogic: parameterized purge of the commit log of a Reference
Browse files Browse the repository at this point in the history
Just implements the functionality to "shorten" the commit log history of a `Reference`. No API yet and this should still be optimized to reduce heap pressure by memoizing only `ObjId`s instead of `CommitObj`s.

Exposing this functionality is not yet adviseable, because it would be meaningles w/o projectnessie#9688 in place and exposed as a functionality as well.

Fixes projectnessie#9733
  • Loading branch information
snazy committed Oct 10, 2024
1 parent 3ba572b commit 125e66f
Show file tree
Hide file tree
Showing 3 changed files with 217 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,19 @@
import static org.mockito.Mockito.spy;
import static org.projectnessie.nessie.relocated.protobuf.ByteString.copyFromUtf8;
import static org.projectnessie.versioned.storage.common.config.StoreConfig.CONFIG_COMMIT_TIMEOUT_MILLIS;
import static org.projectnessie.versioned.storage.common.indexes.StoreKey.key;
import static org.projectnessie.versioned.storage.common.logic.CommitLogQuery.commitLogQuery;
import static org.projectnessie.versioned.storage.common.logic.CreateCommit.Add.commitAdd;
import static org.projectnessie.versioned.storage.common.logic.CreateCommit.newCommitBuilder;
import static org.projectnessie.versioned.storage.common.logic.InternalRef.REF_REPO;
import static org.projectnessie.versioned.storage.common.logic.InternalRef.allInternalRefs;
import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic;
import static org.projectnessie.versioned.storage.common.logic.Logics.indexesLogic;
import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic;
import static org.projectnessie.versioned.storage.common.logic.PagingToken.emptyPagingToken;
import static org.projectnessie.versioned.storage.common.logic.PagingToken.pagingToken;
import static org.projectnessie.versioned.storage.common.logic.ReferencesQuery.referencesQuery;
import static org.projectnessie.versioned.storage.common.objtypes.CommitHeaders.newCommitHeaders;
import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID;
import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromString;
import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId;
Expand All @@ -49,9 +54,11 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -64,15 +71,18 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.projectnessie.versioned.storage.common.exceptions.RefAlreadyExistsException;
import org.projectnessie.versioned.storage.common.exceptions.RefConditionFailedException;
import org.projectnessie.versioned.storage.common.exceptions.RefNotFoundException;
import org.projectnessie.versioned.storage.common.exceptions.RetryTimeoutException;
import org.projectnessie.versioned.storage.common.indexes.StoreIndexElement;
import org.projectnessie.versioned.storage.common.logic.InternalRef;
import org.projectnessie.versioned.storage.common.logic.PagedResult;
import org.projectnessie.versioned.storage.common.logic.PagingToken;
import org.projectnessie.versioned.storage.common.logic.ReferenceLogic;
import org.projectnessie.versioned.storage.common.objtypes.CommitObj;
import org.projectnessie.versioned.storage.common.objtypes.CommitType;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.persist.Persist;
Expand All @@ -94,6 +104,111 @@ protected AbstractReferenceLogicTests(Class<?> surroundingTestClass) {
this.surroundingTestClass = surroundingTestClass;
}

@ParameterizedTest
@CsvSource({
"0, 1", "1, 1", "5, 1", "50, 1", "5, 3", "50, 10",
})
public void rewriteCommitLog(int numSourceCommits, int cutoff) throws Exception {
var referenceLogic = referenceLogic(persist);
var commitLogic = commitLogic(persist);
var indexesLogic = indexesLogic(persist);

var commits = new ArrayList<CommitObj>();
var head = EMPTY_OBJ_ID;
for (int i = 0; i < numSourceCommits; i++) {
var commit =
commitLogic.doCommit(
newCommitBuilder()
.message("commit #" + i)
.parentCommitId(head)
.headers(newCommitHeaders().add("random", UUID.randomUUID().toString()).build())
.addAdds(
commitAdd(key("key", "num-" + i), 1, EMPTY_OBJ_ID, null, UUID.randomUUID()))
.build(),
List.of());
commits.add(commit);
soft.assertThat(commit.seq()).isEqualTo(1 + i);
head = commit.id();
}

var reference =
referenceLogic.createReference("rewrite-commits-" + numSourceCommits, head, null);

var expectedCommitSeq = new AtomicInteger(numSourceCommits);

var notRewritten =
referenceLogic.rewriteCommitLog(
reference,
(num, commit) -> {
soft.assertThat(commit.seq()).isEqualTo(expectedCommitSeq.getAndDecrement());
soft.assertThat(num - 1).isEqualTo(numSourceCommits - commit.seq());
soft.assertThat(commit).isEqualTo(commits.get(numSourceCommits - num));
return false;
});

soft.assertThat(notRewritten).isSameAs(reference);

var rewritten = referenceLogic.rewriteCommitLog(reference, (num, commit) -> num == cutoff);

if (numSourceCommits == 0 || numSourceCommits == 1) {
// nothing changed
soft.assertThat(rewritten).isSameAs(reference);
} else {
soft.assertThat(rewritten.pointer()).isNotEqualTo(reference.pointer());

var newCommits = newArrayList(commitLogic.commitLog(commitLogQuery(rewritten.pointer())));
soft.assertThat(newCommits).hasSize(cutoff);

var newHead = commitLogic.fetchCommit(rewritten.pointer());

if (cutoff > 1) {
for (int i = 0; i < cutoff - 1; i++) {
var newCommit = newCommits.get(i);
var oldCommit = commits.get(numSourceCommits - i - 1);

soft.assertThat(newCommit.directParent()).isEqualTo(newCommits.get(i + 1).id());
soft.assertThat(newCommit.seq()).isEqualTo(cutoff - i);

var newContents =
newArrayList(indexesLogic.incrementalIndexFromCommit(newCommit)).stream()
.map(StoreIndexElement::content)
.collect(Collectors.toList());
var oldContents =
newArrayList(indexesLogic.incrementalIndexFromCommit(oldCommit)).stream()
.map(StoreIndexElement::content)
.collect(Collectors.toList());

soft.assertThat(newContents).isEqualTo(oldContents);
}

var numNonIncremetalOps =
newArrayList(indexesLogic.incrementalIndexFromCommit(newHead)).stream()
.filter(c -> c.content().action().currentCommit())
.count();
soft.assertThat(numNonIncremetalOps).isEqualTo(1);
soft.assertThat(newHead.seq()).isEqualTo(cutoff);
soft.assertThat(newHead.directParent()).isNotEqualTo(EMPTY_OBJ_ID);
soft.assertThat(newHead.tail()).hasSize(cutoff);
soft.assertThat(newHead.tail().get(cutoff - 1)).isEqualTo(EMPTY_OBJ_ID);
} else {
var numNonIncremetalOps =
newArrayList(indexesLogic.incrementalIndexFromCommit(newHead)).stream()
.filter(c -> c.content().action().currentCommit())
.count();
soft.assertThat(numNonIncremetalOps).isEqualTo(numSourceCommits);
soft.assertThat(newHead.seq()).isEqualTo(1);
soft.assertThat(newHead.directParent()).isEqualTo(EMPTY_OBJ_ID);
soft.assertThat(newHead.tail()).containsExactly(EMPTY_OBJ_ID);
}

soft.assertThat(indexesLogic.buildCompleteIndexOrEmpty(newHead).asKeyList())
.containsExactlyInAnyOrderElementsOf(
IntStream.range(0, numSourceCommits)
.mapToObj(i -> key("key", "num-" + i))
.collect(Collectors.toList()));
}
}

@SuppressWarnings("BusyWait")
@RepeatedTest(2)
public void refCreationDeletionWithConcurrentRefsListing() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@
import jakarta.annotation.Nullable;
import java.util.Collections;
import java.util.List;
import java.util.function.BiPredicate;
import org.projectnessie.versioned.storage.common.exceptions.CommitConflictException;
import org.projectnessie.versioned.storage.common.exceptions.ObjNotFoundException;
import org.projectnessie.versioned.storage.common.exceptions.RefAlreadyExistsException;
import org.projectnessie.versioned.storage.common.exceptions.RefConditionFailedException;
import org.projectnessie.versioned.storage.common.exceptions.RefNotFoundException;
import org.projectnessie.versioned.storage.common.exceptions.RetryTimeoutException;
import org.projectnessie.versioned.storage.common.objtypes.CommitObj;
import org.projectnessie.versioned.storage.common.persist.ObjId;
import org.projectnessie.versioned.storage.common.persist.Reference;

Expand Down Expand Up @@ -115,4 +119,26 @@ void deleteReference(@Nonnull String name, @Nonnull ObjId expectedPointer)
@Nonnull
Reference assignReference(@Nonnull Reference current, @Nonnull ObjId newPointer)
throws RefNotFoundException, RefConditionFailedException;

/**
* Rewrites the commit log of the given reference up to including the first commit for which the
* given predicate returns {@code true}.
*
* <p>Read the commit log of {@code current} until {@code cutoffPredicate} returns {@code true}.
* All commits that have been read so far, including the one for which the predicate returned
* {@code true}, will be included. The reference will be updated with the "tip/HEAD" of the newest
* written commit. If the predicate never returned {@code true}, no new commits will be written
* and the reference will not be updated.
*
* @param current the commit log of this reference is going to be rewritten
* @param cutoffPredicate predicate receiving the number of the commit (starting at {@code 1}) and
* the {@link CommitObj}
* @return the updated reference or just {@code current}, if no new commits were written
*/
Reference rewriteCommitLog(
@Nonnull Reference current, BiPredicate<Integer, CommitObj> cutoffPredicate)
throws RefNotFoundException,
RefConditionFailedException,
CommitConflictException,
ObjNotFoundException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
import static org.projectnessie.nessie.relocated.protobuf.ByteString.copyFromUtf8;
import static org.projectnessie.versioned.storage.common.indexes.StoreIndexElement.indexElement;
import static org.projectnessie.versioned.storage.common.indexes.StoreKey.key;
import static org.projectnessie.versioned.storage.common.logic.CommitConflict.ConflictType.KEY_EXISTS;
import static org.projectnessie.versioned.storage.common.logic.CommitRetry.commitRetry;
Expand All @@ -38,6 +39,7 @@
import static org.projectnessie.versioned.storage.common.logic.ReferenceLogicImpl.CommitReferenceResult.Kind.REF_ROW_EXISTS;
import static org.projectnessie.versioned.storage.common.logic.ReferenceLogicImpl.CommitReferenceResult.Kind.REF_ROW_MISSING;
import static org.projectnessie.versioned.storage.common.objtypes.CommitHeaders.newCommitHeaders;
import static org.projectnessie.versioned.storage.common.objtypes.CommitOp.commitOp;
import static org.projectnessie.versioned.storage.common.objtypes.RefObj.ref;
import static org.projectnessie.versioned.storage.common.objtypes.StandardObjType.COMMIT;
import static org.projectnessie.versioned.storage.common.objtypes.StandardObjType.REF;
Expand All @@ -51,11 +53,13 @@
import jakarta.annotation.Nonnull;
import jakarta.annotation.Nullable;
import java.time.Instant;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiPredicate;
import java.util.function.Function;
import java.util.function.Supplier;
import org.projectnessie.nessie.relocated.protobuf.ByteString;
Expand Down Expand Up @@ -679,6 +683,78 @@ public Reference assignReference(@Nonnull Reference current, @Nonnull ObjId newP
return persist.updateReferencePointer(current, newPointer);
}

@Override
public Reference rewriteCommitLog(
@Nonnull Reference current, BiPredicate<Integer, CommitObj> cutoffPredicate)
throws RefNotFoundException,
RefConditionFailedException,
CommitConflictException,
ObjNotFoundException {
var commitLogic = commitLogic(persist);
var indexesLogic = indexesLogic(persist);

var log = commitLogic.commitLog(CommitLogQuery.commitLogQuery(current.pointer()));
var commits = new ArrayDeque<CommitObj>();
for (int n = 1; log.hasNext(); n++) {
var commit = log.next();

if (cutoffPredicate.test(n, commit)) {
// THIS is the last commit to retain

if (commit.directParent().equals(EMPTY_OBJ_ID)) {
return current;
}

// Build the "oldest" commit
var c =
newCommitBuilder()
.message(commit.message())
.parentCommitId(EMPTY_OBJ_ID)
.headers(commit.headers());
for (StoreIndexElement<CommitOp> element : indexesLogic.buildCompleteIndexOrEmpty(commit)) {
var op = element.content();
if (op.action().exists()) {
c.addAdds(commitAdd(element.key(), op.payload(), op.value(), null, op.contentId()));
}
}
var head = commitLogic.buildCommitObj(c.build());
commitLogic.storeCommit(head, List.of());

// Build the "next newer" commit(s)
while (!commits.isEmpty()) {
commit = commits.removeLast();

c =
newCommitBuilder()
.message(commit.message())
.parentCommitId(head.id())
.headers(commit.headers());
for (StoreIndexElement<CommitOp> element :
indexesLogic.incrementalIndexFromCommit(commit)) {
var op = element.content();
if (op.action().currentCommit()) {
if (op.action().exists()) {
c.addAdds(commitAdd(element.key(), op.payload(), op.value(), null, op.contentId()));
} else {
c.addRemoves(commitRemove(element.key(), op.payload(), op.value(), op.contentId()));
}
}
}
head = commitLogic.buildCommitObj(c.build());
commitLogic.storeCommit(head, List.of());
}

return persist.updateReferencePointer(current, head.id());
}

// memoize the
commits.addLast(commit);
}

// no cut-off found, just return "current"
return current;
}

private Reference maybeRecover(
@Nonnull String name,
Reference ref,
Expand Down

0 comments on commit 125e66f

Please sign in to comment.