diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts
index b068c557f4..d7cb70f8dd 100644
--- a/bom/build.gradle.kts
+++ b/bom/build.gradle.kts
@@ -88,6 +88,7 @@ dependencies {
api(project(":nessie-versioned-storage-cassandra-tests"))
api(project(":nessie-versioned-storage-cassandra2"))
api(project(":nessie-versioned-storage-cassandra2-tests"))
+ api(project(":nessie-versioned-storage-cleanup"))
api(project(":nessie-versioned-storage-common"))
api(project(":nessie-versioned-storage-common-proto"))
api(project(":nessie-versioned-storage-common-serialize"))
diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties
index 6abd8850df..d9051f5880 100644
--- a/gradle/projects.main.properties
+++ b/gradle/projects.main.properties
@@ -71,6 +71,7 @@ nessie-versioned-storage-cassandra=versioned/storage/cassandra
nessie-versioned-storage-cassandra-tests=versioned/storage/cassandra-tests
nessie-versioned-storage-cassandra2=versioned/storage/cassandra2
nessie-versioned-storage-cassandra2-tests=versioned/storage/cassandra2-tests
+nessie-versioned-storage-cleanup=versioned/storage/cleanup
nessie-versioned-storage-common=versioned/storage/common
nessie-versioned-storage-common-proto=versioned/storage/common-proto
nessie-versioned-storage-common-serialize=versioned/storage/common-serialize
diff --git a/versioned/storage/cleanup/build.gradle.kts b/versioned/storage/cleanup/build.gradle.kts
new file mode 100644
index 0000000000..fa457c1872
--- /dev/null
+++ b/versioned/storage/cleanup/build.gradle.kts
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2022 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins { id("nessie-conventions-server") }
+
+publishingHelper { mavenName = "Nessie - Storage - Cleanup unreferenced objects" }
+
+description = "Identify and purge unreferenced objects in the Nessie repository."
+
+dependencies {
+ implementation(project(":nessie-model"))
+ implementation(project(":nessie-versioned-storage-common"))
+ implementation(project(":nessie-versioned-spi"))
+ implementation(project(":nessie-versioned-transfer-related"))
+
+ compileOnly(libs.jakarta.validation.api)
+ compileOnly(libs.jakarta.annotation.api)
+ compileOnly(libs.microprofile.openapi)
+
+ compileOnly(platform(libs.jackson.bom))
+ compileOnly("com.fasterxml.jackson.core:jackson-annotations")
+
+ compileOnly(libs.errorprone.annotations)
+ implementation(libs.guava)
+ implementation(libs.agrona)
+ implementation(libs.slf4j.api)
+
+ compileOnly(project(":nessie-versioned-storage-testextension"))
+
+ compileOnly(project(":nessie-immutables"))
+ annotationProcessor(project(":nessie-immutables", configuration = "processor"))
+
+ testImplementation(project(":nessie-versioned-storage-testextension"))
+ testImplementation(project(":nessie-versioned-storage-inmemory"))
+ testImplementation(project(":nessie-versioned-tests"))
+ testImplementation(project(path = ":nessie-protobuf-relocated", configuration = "shadow"))
+ testImplementation(platform(libs.junit.bom))
+ testImplementation(libs.bundles.junit.testing)
+ testRuntimeOnly(libs.logback.classic)
+
+ testCompileOnly(project(":nessie-immutables"))
+ testAnnotationProcessor(project(":nessie-immutables", configuration = "processor"))
+
+ testCompileOnly(libs.microprofile.openapi)
+
+ testCompileOnly(platform(libs.jackson.bom))
+ testCompileOnly("com.fasterxml.jackson.core:jackson-annotations")
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java
new file mode 100644
index 0000000000..57f1bf1cfc
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import static org.projectnessie.versioned.storage.cleanup.PurgeFilter.ReferencedObjectsPurgeFilter.referencedObjectsPurgeFilter;
+import static org.projectnessie.versioned.storage.cleanup.ReferencedObjectsContext.objectsResolverContext;
+
+import org.projectnessie.versioned.storage.common.persist.Obj;
+import org.projectnessie.versioned.storage.common.persist.Persist;
+
+/**
+ * Primary point of entry to remove unreferenced objects from Nessie's backend database.
+ *
+ *
Simplified example code flow:
+ * var params =
+ * CleanupParams.builder().build();
+ * var cleanup =
+ * createCleanup(params);
+ *
+ * var referencedObjectsContext =
+ * cleanup.buildReferencedObjectsContext(persist,
+ * TimeUnit.MILLISECONDS.toMicros(
+ * Instant.now().minus(3, ChronoUnit.DAYS)
+ * .toEpochMilli()));
+ * var referencedObjectsResolver =
+ * cleanup.createReferencedObjectsResolver(referencedObjectsContext);
+ *
+ * // Must handle MustRestartWithBiggerFilterException
+ * var resolveResult =
+ * referencedObjectsResolver.resolve();
+ *
+ * var purgeObjects =
+ * cleanup.createPurgeObjects(resolveResult.purgeObjectsContext());
+ * var purgeResult =
+ * purgeObjects.purge();
+ *
+ */
+public class Cleanup {
+ private final CleanupParams cleanupParams;
+
+ private Cleanup(CleanupParams cleanupParams) {
+ this.cleanupParams = cleanupParams;
+ }
+
+ public static Cleanup createCleanup(CleanupParams params) {
+ return new Cleanup(params);
+ }
+
+ /**
+ * Create the context holder used when identifying referenced objects and purging unreferenced
+ * objects.
+ *
+ *
Choosing an appropriate value for {@code maxObjReferenced} is crucial. Technically, this
+ * value must be at max the current timestamp - but logically {@code maxObjReferenced} should be
+ * the timestamp of a few days ago to not delete unreferenced objects too early and give users a
+ * chance to reset branches to another commit ID in case some table/view metadata is broken.
+ *
+ *
Uses an instance of {@link
+ * org.projectnessie.versioned.storage.cleanup.PurgeFilter.ReferencedObjectsPurgeFilter} using a
+ * bloom filter based {@link ReferencedObjectsFilter}, both configured using {@link
+ * CleanupParams}'s attributes.
+ *
+ * @param persist the persistence/repository to run against
+ * @param maxObjReferenced only {@link Obj}s with a {@link Obj#referenced()} older than {@code
+ * maxObjReferenced} will be deleted. Production workloads should set this to something like
+ * "now minus 7 days" to have the chance to reset branches, just in case. Technically, this
+ * value must not be greater than "now". "Now" should be inquired using {@code
+ * Persist.config().clock().instant()}.
+ */
+ public ReferencedObjectsContext buildReferencedObjectsContext(
+ Persist persist, long maxObjReferenced) {
+ var referencedObjects = new ReferencedObjectsFilterImpl(cleanupParams);
+ var purgeFilter = referencedObjectsPurgeFilter(referencedObjects, maxObjReferenced);
+ return objectsResolverContext(persist, cleanupParams, referencedObjects, purgeFilter);
+ }
+
+ /**
+ * Creates a new objects-resolver instance to identify referenced objects, which must be
+ * retained.
+ *
+ * @param objectsResolverContext context, preferably created using {@link
+ * #buildReferencedObjectsContext(Persist, long)}
+ */
+ public ReferencedObjectsResolver createReferencedObjectsResolver(
+ ReferencedObjectsContext objectsResolverContext) {
+ return new ReferencedObjectsResolverImpl(
+ objectsResolverContext, cleanupParams.rateLimitFactory());
+ }
+
+ /**
+ * Creates a new objects-purger instance to delete unreferenced objects.
+ *
+ * @param purgeObjectsContext return value of {@link ReferencedObjectsResolver#resolve()}.
+ */
+ public PurgeObjects createPurgeObjects(PurgeObjectsContext purgeObjectsContext) {
+ return new PurgeObjectsImpl(purgeObjectsContext, cleanupParams.rateLimitFactory());
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java
new file mode 100644
index 0000000000..1814259ef6
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java
@@ -0,0 +1,195 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import static org.projectnessie.versioned.storage.common.logic.InternalRef.REF_REFS;
+import static org.projectnessie.versioned.storage.common.logic.InternalRef.REF_REPO;
+import static org.projectnessie.versioned.transfer.related.CompositeTransferRelatedObjects.createCompositeTransferRelatedObjects;
+
+import java.util.List;
+import java.util.function.IntFunction;
+import org.immutables.value.Value;
+import org.projectnessie.nessie.immutables.NessieImmutable;
+import org.projectnessie.versioned.storage.common.objtypes.CommitObj;
+import org.projectnessie.versioned.storage.common.persist.Obj;
+import org.projectnessie.versioned.storage.common.persist.ObjId;
+import org.projectnessie.versioned.transfer.related.TransferRelatedObjects;
+
+/**
+ * Technically and implementation oriented parameters for Nessie's backend database cleanup,
+ * considered for internal use only.
+ *
+ *
Any API or functionality that exposes Nessie's backend database cleanup must provide a
+ * functionally oriented way for configuration and generate a {@link CleanupParams} from it.
+ */
+@NessieImmutable
+public interface CleanupParams {
+ // Following defaults result in a serialized bloom filter size of about 3000000 bytes.
+ long DEFAULT_EXPECTED_OBJ_COUNT = 1_000_000L;
+ double DEFAULT_FALSE_POSITIVE_PROBABILITY = 0.00001d;
+ double DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY = 0.0001d;
+ boolean DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS = false;
+ int DEFAULT_PENDING_OBJS_BATCH_SIZE = 20;
+ int DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE = 100_000;
+
+ static ImmutableCleanupParams.Builder builder() {
+ return ImmutableCleanupParams.builder();
+ }
+
+ /**
+ * Number of expected {@link Obj}s, defaults to {@value #DEFAULT_EXPECTED_OBJ_COUNT}, used to size
+ * the bloom filter identifying the referenced {@link Obj}s. If {@link
+ * ReferencedObjectsResolver#resolve()} throws {@link MustRestartWithBiggerFilterException}, it is
+ * recommended to increase this value.
+ */
+ @Value.Default
+ default long expectedObjCount() {
+ return DEFAULT_EXPECTED_OBJ_COUNT;
+ }
+
+ /**
+ * Returns an updated instance of {@code this} value with {@link #expectedObjCount()} increased by
+ * {@value #DEFAULT_EXPECTED_OBJ_COUNT} as a convenience function to handle {@link
+ * MustRestartWithBiggerFilterException} thrown by {@link ReferencedObjectsResolver#resolve()} .
+ */
+ default CleanupParams withIncreasedExpectedObjCount() {
+ return builder()
+ .from(this)
+ .expectedObjCount(expectedObjCount() + DEFAULT_EXPECTED_OBJ_COUNT)
+ .build();
+ }
+
+ /**
+ * Related to {@link #expectedObjCount()}, used to size the bloom filter identifying the
+ * referenced {@link Obj}s, defaults to {@value #DEFAULT_FALSE_POSITIVE_PROBABILITY}.
+ */
+ @Value.Default
+ default double falsePositiveProbability() {
+ return DEFAULT_FALSE_POSITIVE_PROBABILITY;
+ }
+
+ /**
+ * Maximum allowed FPP, checked when adding to the bloom filter identifying the referenced {@link
+ * Obj}s, defaults to {@value #DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY}. If this value is
+ * exceeded, a {@link MustRestartWithBiggerFilterException} will be thrown from {@link
+ * ReferencedObjectsResolver#resolve()}.
+ */
+ @Value.Default
+ default double allowedFalsePositiveProbability() {
+ return DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY;
+ }
+
+ /** Helper functionality to identify related {@link Obj}s, see {@link TransferRelatedObjects}. */
+ @Value.Default
+ default TransferRelatedObjects relatedObjects() {
+ return createCompositeTransferRelatedObjects();
+ }
+
+ /**
+ * {@link ReferencedObjectsResolver} tries to not walk a commit more than once by memoizing the
+ * visited {@link CommitObj#id() commit IDs}, default is {@link
+ * #DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS}. Setting this to {@code true} disables this
+ * optimization.
+ */
+ @Value.Default
+ default boolean allowDuplicateCommitTraversals() {
+ return DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS;
+ }
+
+ /**
+ * Rate limit for commit objects per second during {@link ReferencedObjectsResolver#resolve()},
+ * default is unlimited. Any positive value enables rate limiting, any value {@code <=0} disables
+ * rate limiting.
+ */
+ @Value.Default
+ default int resolveCommitRatePerSecond() {
+ return 0;
+ }
+
+ /**
+ * Rate limit for (non commit) objects per second during {@link
+ * ReferencedObjectsResolver#resolve()}, default is unlimited. Any positive value enables rate
+ * limiting, any value {@code <=0} disables rate limiting.
+ */
+ @Value.Default
+ default int resolveObjRatePerSecond() {
+ return 0;
+ }
+
+ /**
+ * Rate limit for scanning objects per second during {@link PurgeObjects#purge()}, default is
+ * unlimited. Any positive value enables rate limiting, any value {@code <=0} disables rate
+ * limiting.
+ */
+ @Value.Default
+ default int purgeScanObjRatePerSecond() {
+ return 0;
+ }
+
+ /**
+ * Rate limit for purging objects per second during {@link PurgeObjects#purge()}, default is
+ * unlimited. Any positive value enables rate limiting, any value {@code <=0} disables rate
+ * limiting.
+ */
+ @Value.Default
+ default int purgeDeleteObjRatePerSecond() {
+ return 0;
+ }
+
+ /**
+ * {@link ReferencedObjectsResolver} attempts to fetch objects from the backend database in
+ * batches, this parameter defines the batch size, defaults to {@link
+ * #DEFAULT_PENDING_OBJS_BATCH_SIZE}.
+ */
+ @Value.Default
+ default int pendingObjsBatchSize() {
+ return DEFAULT_PENDING_OBJS_BATCH_SIZE;
+ }
+
+ /**
+ * Size of the "recent object IDs" filter to prevent processing the same {@link ObjId}s. This *
+ * happens, when the values referenced from the commit index are iterated, because it iterates *
+ * over all keys, not only the keys added by a particular commit.
+ *
+ *
The value defaults to {@value #DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE}. It should be higher than
+ * the maximum number of keys in a commit.
+ */
+ @Value.Default
+ default int recentObjIdsFilterSize() {
+ return DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE;
+ }
+
+ /** Rate limiter factory for the rate limits defined above, useful for testing purposes. */
+ @Value.Default
+ default IntFunction rateLimitFactory() {
+ return RateLimit::create;
+ }
+
+ /** Defines the names of the Nessie internal references, do not change. */
+ @Value.Default
+ default List internalReferenceNames() {
+ return List.of(REF_REFS.name(), REF_REPO.name());
+ }
+
+ /**
+ * Optionally enable a dry-run mode, which does not delete any objects from the backend database,
+ * defaults to {@code false}.
+ */
+ @Value.Default
+ default boolean dryRun() {
+ return false;
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/HeapSizes.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/HeapSizes.java
new file mode 100644
index 0000000000..70b2c7c097
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/HeapSizes.java
@@ -0,0 +1,162 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import static java.lang.String.format;
+
+final class HeapSizes {
+ private HeapSizes() {}
+
+ /*
+ org.agrona.collections.ObjectHashSet object internals:
+ OFF SZ TYPE DESCRIPTION VALUE
+ 0 8 (object header: mark) N/A
+ 8 4 (object header: class) N/A
+ 12 4 float ObjectHashSet.loadFactor N/A
+ 16 4 int ObjectHashSet.resizeThreshold N/A
+ 20 4 int ObjectHashSet.size N/A
+ 24 1 boolean ObjectHashSet.shouldAvoidAllocation N/A
+ 25 7 (alignment/padding gap)
+ 32 8 java.lang.Object[] ObjectHashSet.values N/A
+ 40 8 org.agrona.collections.ObjectHashSet.ObjectIterator ObjectHashSet.iterator N/A
+ 48 8 java.util.function.IntConsumer ObjectHashSet.resizeNotifier N/A
+ 56 8 (object alignment gap)
+ Instance size: 64 bytes
+ Space losses: 7 bytes internal + 8 bytes external = 15 bytes total
+ */
+ static final long HEAP_SIZE_OBJECT_HASH_SET = 64L;
+ /*
+ org.projectnessie.versioned.storage.common.persist.ObjId$ObjId256 object internals:
+ OFF SZ TYPE DESCRIPTION VALUE
+ 0 8 (object header: mark) N/A
+ 8 4 (object header: class) N/A
+ 12 4 (alignment/padding gap)
+ 16 8 long ObjId256.l0 N/A
+ 24 8 long ObjId256.l1 N/A
+ 32 8 long ObjId256.l2 N/A
+ 40 8 long ObjId256.l3 N/A
+ Instance size: 48 bytes
+ Space losses: 4 bytes internal + 0 bytes external = 4 bytes total
+ */
+ static final long HEAP_SIZE_OBJ_ID = 48L;
+ /*
+ long[] : 16 + 8*length
+ */
+ static final long HEAP_SIZE_PRIMITIVE_OBJ_ARRAY = 16L;
+
+ /*
+ com.google.common.hash.BloomFilter object internals:
+ OFF SZ TYPE DESCRIPTION VALUE
+ 0 8 (object header: mark) N/A
+ 8 8 (object header: class) N/A
+ 16 4 int BloomFilter.numHashFunctions N/A
+ 20 4 (alignment/padding gap)
+ 24 8 com.google.common.hash.BloomFilterStrategies.LockFreeBitArray BloomFilter.bits N/A
+ 32 8 com.google.common.hash.Funnel BloomFilter.funnel N/A
+ 40 8 com.google.common.hash.BloomFilter.Strategy BloomFilter.strategy N/A
+ Instance size: 48 bytes
+ Space losses: 4 bytes internal + 0 bytes external = 4 bytes total
+ */
+ static final long HEAP_SIZE_BLOOM_FILTER = 48L;
+ /*
+ com.google.common.hash.BloomFilterStrategies$LockFreeBitArray object internals:
+ OFF SZ TYPE DESCRIPTION VALUE
+ 0 8 (object header: mark) N/A
+ 8 8 (object header: class) N/A
+ 16 8 java.util.concurrent.atomic.AtomicLongArray LockFreeBitArray.data N/A
+ 24 8 com.google.common.hash.LongAddable LockFreeBitArray.bitCount N/A
+ Instance size: 32 bytes
+ Space losses: 0 bytes internal + 0 bytes external = 0 bytes total
+ */
+ static final long HEAP_SIZE_BIT_ARRAY = 32L;
+ /*
+ We assume that com.google.common.hash.LongAddables uses the pure-Java implementation, not Guava's
+ heap-expensive LongAdder implementation based on its Striped64 with 144 bytes per cell.
+
+ java.util.concurrent.atomic.AtomicLong object internals (com.google.common.hash.LongAddables.PureJavaLongAddable):
+ OFF SZ TYPE DESCRIPTION VALUE
+ 0 8 (object header: mark) N/A
+ 8 4 (object header: class) N/A
+ 12 4 (alignment/padding gap)
+ 16 8 long AtomicLong.value N/A
+ 24 8 (object alignment gap)
+ Instance size: 32 bytes
+ Space losses: 4 bytes internal + 8 bytes external = 12 bytes total
+ */
+ static final long HEAP_SIZE_LONG_ADDER = 40L;
+ /*
+ java.util.concurrent.atomic.AtomicLongArray object internals:
+ OFF SZ TYPE DESCRIPTION VALUE
+ 0 8 (object header: mark) N/A
+ 8 4 (object header: class) N/A
+ 12 4 (alignment/padding gap)
+ 16 8 long[] AtomicLongArray.array N/A
+ 24 8 (object alignment gap)
+ Instance size: 32 bytes
+ Space losses: 4 bytes internal + 8 bytes external = 12 bytes total
+ */
+ static final long HEAP_SIZE_ATOMIC_LONG_ARRAY = 32L;
+ /*
+ long[] : 16 + 8*length
+ */
+ static final long HEAP_SIZE_PRIMITIVE_LONG_ARRAY = 16L;
+
+ /*
+ java.util.LinkedHashMap object internals:
+ OFF SZ TYPE DESCRIPTION VALUE
+ 0 8 (object header: mark) N/A
+ 8 4 (object header: class) N/A
+ 12 4 int HashMap.size N/A
+ 16 8 java.util.Set AbstractMap.keySet N/A
+ 24 8 java.util.Collection AbstractMap.values N/A
+ 32 4 int HashMap.modCount N/A
+ 36 4 int HashMap.threshold N/A
+ 40 4 float HashMap.loadFactor N/A
+ 44 4 int LinkedHashMap.putMode N/A
+ 48 8 java.util.HashMap.Node[] HashMap.table N/A
+ 56 8 java.util.Set HashMap.entrySet N/A
+ 64 1 boolean LinkedHashMap.accessOrder N/A
+ 65 7 (alignment/padding gap)
+ 72 8 java.util.LinkedHashMap.Entry LinkedHashMap.head N/A
+ 80 8 java.util.LinkedHashMap.Entry LinkedHashMap.tail N/A
+ 88 8 (object alignment gap)
+ Instance size: 96 bytes
+ Space losses: 7 bytes internal + 8 bytes external = 15 bytes total
+ */
+ static final long HEAP_SIZE_LINKED_HASH_MAP = 96L;
+ /*
+ java.util.LinkedHashMap$Entry object internals:
+ OFF SZ TYPE DESCRIPTION VALUE
+ 0 8 (object header: mark) N/A
+ 8 8 (object header: class) N/A
+ 16 4 int Node.hash N/A
+ 20 4 (alignment/padding gap)
+ 24 8 java.lang.Object Node.key N/A
+ 32 8 java.lang.Object Node.value N/A
+ 40 8 java.util.HashMap.Node Node.next N/A
+ 48 8 java.util.LinkedHashMap.Entry Entry.before N/A
+ 56 8 java.util.LinkedHashMap.Entry Entry.after N/A
+ Instance size: 64 bytes
+ Space losses: 4 bytes internal + 0 bytes external = 4 bytes total
+ */
+ static final long HEAP_SIZE_LINKED_HASH_MAP_ENTRY = 64L;
+
+ static final long HEAP_SIZE_POINTER = 8L;
+
+ static String memSizeToStringMB(long bytes) {
+ return format("%.1f M", ((double) bytes) / 1024L / 1024L);
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java
new file mode 100644
index 0000000000..7349b9983c
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+/**
+ * Thrown when the bloom filter's FPP is above the configured threshold when adding IDs. If this
+ * exception is encountered, the current garbage-collection run must be aborted and
+ * restarted with a bigger {@link CleanupParams#expectedObjCount()} value.
+ */
+public class MustRestartWithBiggerFilterException extends Exception {
+ public MustRestartWithBiggerFilterException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterRuntimeException.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterRuntimeException.java
new file mode 100644
index 0000000000..a981b366b0
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterRuntimeException.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+/**
+ * Internally used unchecked exception to eventually be "wrapped" by a checked {@link
+ * MustRestartWithBiggerFilterException}.
+ */
+class MustRestartWithBiggerFilterRuntimeException extends RuntimeException {
+ public MustRestartWithBiggerFilterRuntimeException(String msg) {
+ super(msg);
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java
new file mode 100644
index 0000000000..ad5cb5409d
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import jakarta.validation.constraints.NotNull;
+import java.util.List;
+import org.projectnessie.nessie.immutables.NessieImmutable;
+import org.projectnessie.versioned.storage.common.persist.Obj;
+
+/** Filter to decide whether an {@link Obj} must be kept or whether it can be deleted. */
+public interface PurgeFilter {
+ boolean mustKeep(@NotNull Obj obj);
+
+ @NessieImmutable
+ interface CompositePurgeFilter extends PurgeFilter {
+ List filters();
+
+ static CompositePurgeFilter compositePurgeFilter(PurgeFilter... filters) {
+ return ImmutableCompositePurgeFilter.of(List.of(filters));
+ }
+
+ static CompositePurgeFilter compositePurgeFilter(List filters) {
+ return ImmutableCompositePurgeFilter.of(filters);
+ }
+
+ @Override
+ default boolean mustKeep(Obj obj) {
+ for (PurgeFilter filter : filters()) {
+ if (filter.mustKeep(obj)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Recommended default purge filter, which considers a {@link ReferencedObjectsFilter} and a
+ * maximum value of {@link Obj#referenced()}.
+ */
+ @NessieImmutable
+ interface ReferencedObjectsPurgeFilter extends PurgeFilter {
+ ReferencedObjectsFilter referencedObjects();
+
+ long maxObjReferenced();
+
+ static ReferencedObjectsPurgeFilter referencedObjectsPurgeFilter(
+ ReferencedObjectsFilter referencedObjects, long maxObjReferenced) {
+ return ImmutableReferencedObjectsPurgeFilter.of(referencedObjects, maxObjReferenced);
+ }
+
+ @Override
+ default boolean mustKeep(Obj obj) {
+ return obj.referenced() > maxObjReferenced()
+ || referencedObjects().isProbablyReferenced(obj.id());
+ }
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java
new file mode 100644
index 0000000000..57f230258c
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java
@@ -0,0 +1,31 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+public interface PurgeObjects {
+ PurgeResult purge();
+
+ /** Return the current statistics, returns a result after {@link #purge()} threw an exception. */
+ PurgeStats getStats();
+
+ /**
+ * Returns the estimated maximum heap pressure of this object tree. Considers the data
+ * structured that are required for the purge operation to work, a subset of the structures
+ * required for {@link ReferencedObjectsResolver#resolve()}. It is wrong to use the sum of {@link
+ * ReferencedObjectsResolver#estimatedHeapPressure()} and this value.
+ */
+ long estimatedHeapPressure();
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java
new file mode 100644
index 0000000000..1d986a6957
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import jakarta.validation.constraints.NotNull;
+import org.projectnessie.nessie.immutables.NessieImmutable;
+import org.projectnessie.versioned.storage.common.persist.Persist;
+
+/**
+ * Holds the data structures and parameters that are needed for the {@linkplain PurgeObjects purge
+ * operation}.
+ *
+ * Once the {@linkplain ReferencedObjectsResolver referenced objects have been resolved}, the
+ * data structures that are not needed for the purge operation should become eligible for Java GC,
+ * which is why this context object exists and holds less information than {@link
+ * ReferencedObjectsContext}.
+ */
+@NessieImmutable
+public interface PurgeObjectsContext {
+ @NotNull
+ Persist persist();
+
+ @NotNull
+ ReferencedObjectsFilter referencedObjects();
+
+ @NotNull
+ PurgeFilter purgeFilter();
+
+ int scanObjRatePerSecond();
+
+ int deleteObjRatePerSecond();
+
+ static PurgeObjectsContext purgeObjectsContext(
+ ReferencedObjectsContext referencedObjectsContext) {
+ return ImmutablePurgeObjectsContext.of(
+ referencedObjectsContext.persist(),
+ referencedObjectsContext.referencedObjects(),
+ referencedObjectsContext.purgeFilter(),
+ referencedObjectsContext.params().purgeScanObjRatePerSecond(),
+ referencedObjectsContext.params().purgeDeleteObjRatePerSecond(),
+ referencedObjectsContext.params().dryRun());
+ }
+
+ boolean dryRun();
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java
new file mode 100644
index 0000000000..f443da1c16
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java
@@ -0,0 +1,139 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.memSizeToStringMB;
+
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntFunction;
+import org.projectnessie.versioned.storage.common.persist.CloseableIterator;
+import org.projectnessie.versioned.storage.common.persist.Obj;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class PurgeObjectsImpl implements PurgeObjects {
+ private static final Logger LOGGER = LoggerFactory.getLogger(PurgeObjectsImpl.class);
+
+ private final PurgeObjectsContext purgeObjectsContext;
+ private final PurgeStatsBuilder stats;
+ private final AtomicBoolean used = new AtomicBoolean();
+ private final RateLimit scanRateLimiter;
+ private final RateLimit purgeRateLimiter;
+
+ public PurgeObjectsImpl(
+ PurgeObjectsContext purgeObjectsContext, IntFunction rateLimitIntFunction) {
+ this.purgeObjectsContext = purgeObjectsContext;
+ this.stats = new PurgeStatsBuilder();
+ this.scanRateLimiter = rateLimitIntFunction.apply(purgeObjectsContext.scanObjRatePerSecond());
+ this.purgeRateLimiter =
+ rateLimitIntFunction.apply(purgeObjectsContext.deleteObjRatePerSecond());
+ }
+
+ @Override
+ public PurgeResult purge() {
+ checkState(used.compareAndSet(false, true), "resolve() has already been called.");
+
+ var purgeFilter = purgeObjectsContext.purgeFilter();
+ var persist = purgeObjectsContext.persist();
+ var clock = persist.config().clock();
+
+ LOGGER.info(
+ "Purging unreferenced objects in repository '{}', scanning {} objects per second, deleting {} objects per second, estimated context heap pressure: {}",
+ persist.config().repositoryId(),
+ scanRateLimiter,
+ purgeRateLimiter,
+ memSizeToStringMB(estimatedHeapPressure()));
+
+ PurgeStats finalStats = null;
+ try {
+ stats.started = clock.instant();
+ try (CloseableIterator iter = persist.scanAllObjects(Set.of())) {
+ while (iter.hasNext()) {
+ scanRateLimiter.acquire();
+ stats.numScannedObjs++;
+ var obj = iter.next();
+ if (purgeFilter.mustKeep(obj)) {
+ continue;
+ }
+
+ purgeRateLimiter.acquire();
+ purgeObj(obj);
+ }
+ } catch (RuntimeException e) {
+ stats.failure = e;
+ } finally {
+ stats.ended = clock.instant();
+ finalStats = stats.build();
+ }
+
+ LOGGER.info(
+ "Successfully finished purging unreferenced objects after {} in repository '{}', purge stats: {}, estimated context heap pressure: {}",
+ finalStats.duration(),
+ persist.config().repositoryId(),
+ finalStats,
+ memSizeToStringMB(estimatedHeapPressure()));
+ } catch (RuntimeException e) {
+ if (finalStats != null) {
+ LOGGER.warn(
+ "Error while purging unreferenced objects after {} in repository '{}', purge stats: {}, estimated context heap pressure: {}",
+ finalStats.duration(),
+ persist.config().repositoryId(),
+ finalStats,
+ memSizeToStringMB(estimatedHeapPressure()),
+ e);
+ } else {
+ LOGGER.warn(
+ "Error while purging unreferenced objects in repository '{}'",
+ persist.config().repositoryId(),
+ stats.failure);
+ }
+ throw e;
+ }
+
+ return ImmutablePurgeResult.of(stats.build());
+ }
+
+ @Override
+ public PurgeStats getStats() {
+ return stats.build();
+ }
+
+ @Override
+ public long estimatedHeapPressure() {
+ return purgeObjectsContext.referencedObjects().estimatedHeapPressure();
+ }
+
+ private void purgeObj(Obj obj) {
+ // TODO delete in parallel (multiple threads)
+ stats.numPurgedObjs++;
+
+ var persist = purgeObjectsContext.persist();
+
+ var objType = obj.type();
+ LOGGER.trace(
+ "Deleting obj {} of type {}/{} in repository '{}'",
+ obj.id(),
+ objType.name(),
+ objType.shortName(),
+ persist.config().repositoryId());
+
+ if (!purgeObjectsContext.dryRun()) {
+ persist.deleteWithReferenced(obj);
+ }
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java
new file mode 100644
index 0000000000..2e94a6e237
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java
@@ -0,0 +1,23 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import org.projectnessie.nessie.immutables.NessieImmutable;
+
+@NessieImmutable
+public interface PurgeResult {
+ PurgeStats stats();
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java
new file mode 100644
index 0000000000..f597503e25
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+import org.projectnessie.nessie.immutables.NessieImmutable;
+
+@NessieImmutable
+public interface PurgeStats {
+ Instant started();
+
+ Instant ended();
+
+ default Duration duration() {
+ return Duration.between(started(), ended());
+ }
+
+ /** Number of objects handled while scanning the Nessie repository. */
+ long numScannedObjs();
+
+ /**
+ * Number of purged (deleted) objects. For a {@linkplain CleanupParams#dryRun() dry-run}, this
+ * value indicates the number of objects that would have been deleted.
+ */
+ long numPurgedObjs();
+
+ Optional failure();
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java
new file mode 100644
index 0000000000..b318b257f7
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java
@@ -0,0 +1,34 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import java.time.Instant;
+import java.util.Optional;
+
+final class PurgeStatsBuilder {
+ Instant started;
+ Instant ended;
+
+ Exception failure;
+
+ long numScannedObjs;
+ long numPurgedObjs;
+
+ PurgeStats build() {
+ return ImmutablePurgeStats.of(
+ started, ended, numScannedObjs, numPurgedObjs, Optional.ofNullable(failure));
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java
new file mode 100644
index 0000000000..8df6112a16
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java
@@ -0,0 +1,50 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import com.google.common.util.concurrent.RateLimiter;
+
+public interface RateLimit {
+ void acquire();
+
+ @SuppressWarnings("UnstableApiUsage")
+ static RateLimit create(int ratePerSecond) {
+ if (ratePerSecond <= 0) {
+ return new RateLimit() {
+ @Override
+ public void acquire() {}
+
+ @Override
+ public String toString() {
+ return "unlimited";
+ }
+ };
+ }
+ return new RateLimit() {
+ final RateLimiter limiter = RateLimiter.create(ratePerSecond);
+
+ @Override
+ public void acquire() {
+ limiter.acquire();
+ }
+
+ @Override
+ public String toString() {
+ return "up to " + ratePerSecond;
+ }
+ };
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilter.java
new file mode 100644
index 0000000000..171a9ac74d
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilter.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import org.projectnessie.versioned.storage.common.persist.ObjId;
+
+public interface RecentObjIdFilter {
+ boolean add(ObjId id);
+
+ boolean contains(ObjId id);
+
+ /** Returns the estimated maximum heap pressure of this object tree. */
+ long estimatedHeapPressure();
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilterImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilterImpl.java
new file mode 100644
index 0000000000..bac4ff59fe
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilterImpl.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_LINKED_HASH_MAP;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_LINKED_HASH_MAP_ENTRY;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_OBJ_ID;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_POINTER;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_PRIMITIVE_LONG_ARRAY;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import org.projectnessie.versioned.storage.common.persist.ObjId;
+
+final class RecentObjIdFilterImpl implements RecentObjIdFilter {
+ private static final Object PRESENT = new Object();
+
+ private final LinkedHashMap recentObjIds;
+ private final long estimatedHeapPressure;
+
+ public RecentObjIdFilterImpl(int recentObjIdsFilterSize) {
+ int capacity = (int) Math.ceil(recentObjIdsFilterSize / 0.75d);
+ this.estimatedHeapPressure = calculateEstimatedHeapPressure(recentObjIdsFilterSize, capacity);
+
+ this.recentObjIds =
+ new LinkedHashMap<>(capacity) {
+ @Override
+ protected boolean removeEldestEntry(Map.Entry eldest) {
+ return size() >= recentObjIdsFilterSize;
+ }
+ };
+ }
+
+ @Override
+ public boolean contains(ObjId id) {
+ return recentObjIds.containsKey(id);
+ }
+
+ @Override
+ public boolean add(ObjId id) {
+ return recentObjIds.put(id, PRESENT) == null;
+ }
+
+ @Override
+ public long estimatedHeapPressure() {
+ return estimatedHeapPressure;
+ }
+
+ private long calculateEstimatedHeapPressure(int size, int capacity) {
+ int tableSize = -1 >>> Integer.numberOfLeadingZeros(capacity - 1);
+
+ return HEAP_SIZE_LINKED_HASH_MAP
+ +
+ // LHM entries
+ (HEAP_SIZE_LINKED_HASH_MAP_ENTRY + HEAP_SIZE_OBJ_ID) * size
+ // LHM table/node-array
+ + HEAP_SIZE_PRIMITIVE_LONG_ARRAY
+ + HEAP_SIZE_POINTER * tableSize;
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java
new file mode 100644
index 0000000000..6c7dcb9cec
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import static org.projectnessie.versioned.storage.cleanup.VisitedCommitFilter.ALLOW_DUPLICATE_TRAVERSALS;
+
+import jakarta.validation.constraints.NotNull;
+import org.projectnessie.nessie.immutables.NessieImmutable;
+import org.projectnessie.versioned.storage.common.persist.Persist;
+
+/**
+ * Holds the data structures and parameters that are needed to {@linkplain ReferencedObjectsResolver
+ * resolving referenced objects}.
+ */
+@NessieImmutable
+public interface ReferencedObjectsContext {
+ @NotNull
+ Persist persist();
+
+ @NotNull
+ ReferencedObjectsFilter referencedObjects();
+
+ @NotNull
+ CleanupParams params();
+
+ @NotNull
+ PurgeFilter purgeFilter();
+
+ @NotNull
+ VisitedCommitFilter visitedCommitFilter();
+
+ static ReferencedObjectsContext objectsResolverContext(
+ Persist persist,
+ CleanupParams params,
+ ReferencedObjectsFilter referencedObjects,
+ PurgeFilter purgeFilter) {
+ return ImmutableReferencedObjectsContext.of(
+ persist,
+ referencedObjects,
+ params,
+ purgeFilter,
+ params.allowDuplicateCommitTraversals()
+ ? ALLOW_DUPLICATE_TRAVERSALS
+ : new VisitedCommitFilterImpl());
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java
new file mode 100644
index 0000000000..07e359c96a
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import jakarta.validation.constraints.NotNull;
+import org.projectnessie.versioned.storage.common.persist.ObjId;
+
+/**
+ * Mark {@linkplain ObjId object IDs} as referenced and allow checking whether object IDs are marked
+ * as referenced.
+ *
+ * The implementation is usually backed by a probabilistic data structure (bloom filter), which
+ * means that there is a {@linkplain #expectedFpp() chance} that an unreferenced object is not
+ * collected, but all referenced objects are guaranteed to remain.
+ */
+public interface ReferencedObjectsFilter {
+ boolean markReferenced(@NotNull ObjId objId);
+
+ boolean isProbablyReferenced(@NotNull ObjId objId);
+
+ boolean withinExpectedFpp();
+
+ long approximateElementCount();
+
+ double expectedFpp();
+
+ long estimatedHeapPressure();
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java
new file mode 100644
index 0000000000..506c78574b
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java
@@ -0,0 +1,117 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_ATOMIC_LONG_ARRAY;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_BIT_ARRAY;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_BLOOM_FILTER;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_LONG_ADDER;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_PRIMITIVE_LONG_ARRAY;
+
+import com.google.common.hash.BloomFilter;
+import com.google.common.hash.PrimitiveSink;
+import java.util.concurrent.atomic.AtomicLong;
+import org.projectnessie.versioned.storage.common.persist.ObjId;
+
+@SuppressWarnings("UnstableApiUsage")
+final class ReferencedObjectsFilterImpl implements ReferencedObjectsFilter {
+
+ private final BloomFilter filter;
+ private final double allowedFalsePositiveProbability;
+ private final AtomicLong remainingElements;
+ private final long estimatedHeapPressure;
+
+ ReferencedObjectsFilterImpl(CleanupParams params) {
+ this.filter = createBloomFilter(params);
+ this.remainingElements = new AtomicLong(params.expectedObjCount());
+ this.allowedFalsePositiveProbability = params.allowedFalsePositiveProbability();
+ this.estimatedHeapPressure = calculateEstimatedHeapPressure(params);
+ }
+
+ static BloomFilter createBloomFilter(CleanupParams params) {
+ return BloomFilter.create(
+ ReferencedObjectsFilterImpl::funnel,
+ params.expectedObjCount(),
+ params.falsePositiveProbability());
+ }
+
+ private static void funnel(ObjId id, PrimitiveSink primitiveSink) {
+ var idSize = id.size();
+ var i = 0;
+ for (; idSize >= 8; idSize -= 8) {
+ primitiveSink.putLong(id.longAt(i++));
+ }
+ i <<= 3;
+ for (; idSize > 0; idSize--) {
+ primitiveSink.putByte(id.byteAt(i++));
+ }
+ }
+
+ @Override
+ public boolean markReferenced(ObjId objId) {
+ if (filter.put(objId)) {
+ if (remainingElements.decrementAndGet() >= 0L || withinExpectedFpp()) {
+ return true;
+ }
+ throw new MustRestartWithBiggerFilterRuntimeException(
+ "Bloom filter exceeded the configured expected FPP");
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isProbablyReferenced(ObjId objId) {
+ return filter.mightContain(objId);
+ }
+
+ @Override
+ public boolean withinExpectedFpp() {
+ return expectedFpp() <= allowedFalsePositiveProbability;
+ }
+
+ @Override
+ public long approximateElementCount() {
+ return filter.approximateElementCount();
+ }
+
+ @Override
+ public double expectedFpp() {
+ return filter.expectedFpp();
+ }
+
+ @Override
+ public long estimatedHeapPressure() {
+ return estimatedHeapPressure;
+ }
+
+ private static long calculateEstimatedHeapPressure(CleanupParams params) {
+ var bits = optimalNumOfBits(params.expectedObjCount(), params.falsePositiveProbability());
+ var arrayLen = bits / 64 + 1;
+ return HEAP_SIZE_BLOOM_FILTER
+ + HEAP_SIZE_BIT_ARRAY
+ + HEAP_SIZE_LONG_ADDER
+ + HEAP_SIZE_ATOMIC_LONG_ARRAY
+ + HEAP_SIZE_PRIMITIVE_LONG_ARRAY * arrayLen;
+ }
+
+ // See com.google.common.hash.BloomFilter.optimalNumOfBits
+ private static long optimalNumOfBits(long expectedInsertions, double fpp) {
+ if (fpp == 0) {
+ fpp = Double.MIN_VALUE;
+ }
+ return (long) (-expectedInsertions * Math.log(fpp) / (Math.log(2) * Math.log(2)));
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java
new file mode 100644
index 0000000000..ce3906933b
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import org.projectnessie.versioned.storage.common.persist.Persist;
+
+public interface ReferencedObjectsResolver {
+ /**
+ * Identifies all referenced objects in the {@linkplain Persist Nessie repository}.
+ *
+ * @return result containing the information for the follow-up {@linkplain PurgeObjects#purge()
+ * purge operation} and stats.
+ * @throws MustRestartWithBiggerFilterException thrown if this operation identifies more than the
+ * configured {@linkplain CleanupParams#expectedObjCount() expected object count}. This
+ * exception must be handled by calling code
+ */
+ ResolveResult resolve() throws MustRestartWithBiggerFilterException;
+
+ /**
+ * Return the current statistics, returns a valid result, even if {@link #resolve()} threw an
+ * exception.
+ */
+ ResolveStats getStats();
+
+ /**
+ * Returns the estimated maximum heap pressure of this object tree. Considers the data
+ * structured that are required for the resolve operation to work, a superset of the structures
+ * required for {@link PurgeObjects#purge()}. It is wrong to use the sum of {@link
+ * PurgeObjects#estimatedHeapPressure()} and this value.
+ */
+ long estimatedHeapPressure();
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java
new file mode 100644
index 0000000000..e0f3f833d4
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java
@@ -0,0 +1,369 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.memSizeToStringMB;
+import static org.projectnessie.versioned.storage.cleanup.PurgeObjectsContext.purgeObjectsContext;
+import static org.projectnessie.versioned.storage.common.logic.CommitLogQuery.commitLogQuery;
+import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic;
+import static org.projectnessie.versioned.storage.common.logic.Logics.indexesLogic;
+import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic;
+import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic;
+import static org.projectnessie.versioned.storage.common.logic.ReferencesQuery.referencesQuery;
+import static org.projectnessie.versioned.storage.common.objtypes.StandardObjType.VALUE;
+import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID;
+
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntFunction;
+import org.agrona.collections.ObjectHashSet;
+import org.projectnessie.model.Content;
+import org.projectnessie.versioned.storage.common.indexes.StoreIndexElement;
+import org.projectnessie.versioned.storage.common.logic.CommitLogic;
+import org.projectnessie.versioned.storage.common.objtypes.CommitObj;
+import org.projectnessie.versioned.storage.common.objtypes.CommitOp;
+import org.projectnessie.versioned.storage.common.objtypes.ContentValueObj;
+import org.projectnessie.versioned.storage.common.persist.Obj;
+import org.projectnessie.versioned.storage.common.persist.ObjId;
+import org.projectnessie.versioned.storage.common.persist.Persist;
+import org.projectnessie.versioned.storage.common.persist.Reference;
+import org.projectnessie.versioned.store.DefaultStoreWorker;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+final class ReferencedObjectsResolverImpl implements ReferencedObjectsResolver {
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReferencedObjectsResolverImpl.class);
+
+ private final ObjectHashSet pendingObjs = new ObjectHashSet<>();
+ private final Deque pendingHeads = new ArrayDeque<>();
+
+ /**
+ * Set of recently handled 'ObjId's to prevent re-processing the same objects multiple times. This
+ * happens, when the values referenced from the commit index are iterated, because it iterates
+ * over all keys, not only the keys added by a particular commit.
+ */
+ private final RecentObjIdFilter recentObjIds;
+
+ private final ReferencedObjectsContext referencedObjectsContext;
+
+ private final ResolveStatsBuilder stats;
+ private final RateLimit commitRateLimiter;
+ private final RateLimit objRateLimiter;
+
+ private final AtomicBoolean used = new AtomicBoolean();
+
+ ReferencedObjectsResolverImpl(
+ ReferencedObjectsContext referencedObjectsContext,
+ IntFunction rateLimitIntFunction) {
+ this.referencedObjectsContext = referencedObjectsContext;
+ this.stats = new ResolveStatsBuilder();
+ this.commitRateLimiter =
+ rateLimitIntFunction.apply(referencedObjectsContext.params().resolveCommitRatePerSecond());
+ this.objRateLimiter =
+ rateLimitIntFunction.apply(referencedObjectsContext.params().resolveObjRatePerSecond());
+ this.recentObjIds =
+ new RecentObjIdFilterImpl(referencedObjectsContext.params().recentObjIdsFilterSize());
+ }
+
+ @Override
+ public long estimatedHeapPressure() {
+ return referencedObjectsContext.referencedObjects().estimatedHeapPressure()
+ + referencedObjectsContext.visitedCommitFilter().estimatedHeapPressure()
+ + recentObjIds.estimatedHeapPressure();
+ }
+
+ @Override
+ public ResolveResult resolve() throws MustRestartWithBiggerFilterException {
+ checkState(used.compareAndSet(false, true), "resolve() has already been called.");
+
+ LOGGER.info(
+ "Identifying referenced objects in repository '{}', processing {} commits per second, processing {} objects per second, estimated context heap pressure: {}",
+ referencedObjectsContext.persist().config().repositoryId(),
+ commitRateLimiter,
+ objRateLimiter,
+ memSizeToStringMB(estimatedHeapPressure()));
+
+ var persist = referencedObjectsContext.persist();
+ var params = referencedObjectsContext.params();
+
+ ResolveStats finalStats = null;
+ try {
+ finalStats = doResolve(persist, params);
+
+ LOGGER.info(
+ "Successfully finished identifying referenced objects after {} in repository '{}', resolve stats: {}, estimated context heap pressure: {}",
+ finalStats.duration(),
+ persist.config().repositoryId(),
+ finalStats,
+ memSizeToStringMB(estimatedHeapPressure()));
+ } catch (MustRestartWithBiggerFilterRuntimeException mustRestart) {
+ LOGGER.warn(
+ "Must restart identifying referenced objects for repository '{}', current parameters: expected object count: {}, FPP: {}, allowed FPP: {}, resolve stats: {}, estimated context heap pressure: {}",
+ persist.config().repositoryId(),
+ params.expectedObjCount(),
+ params.falsePositiveProbability(),
+ params.allowedFalsePositiveProbability(),
+ finalStats,
+ memSizeToStringMB(estimatedHeapPressure()));
+ throw new MustRestartWithBiggerFilterException(mustRestart.getMessage(), mustRestart);
+ } catch (RuntimeException e) {
+ if (finalStats != null) {
+ LOGGER.warn(
+ "Error while identifying referenced objects after {} in repository '{}', stats: {}, estimated context heap pressure: {}",
+ finalStats.duration(),
+ persist.config().repositoryId(),
+ finalStats,
+ memSizeToStringMB(estimatedHeapPressure()),
+ e);
+ } else {
+ LOGGER.warn(
+ "Error while identifying referenced objects after {} in repository '{}'",
+ persist.config().repositoryId(),
+ e);
+ }
+ throw e;
+ }
+
+ return ImmutableResolveResult.of(stats.build(), purgeObjectsContext(referencedObjectsContext));
+ }
+
+ private ResolveStats doResolve(Persist persist, CleanupParams params) {
+ var clock = persist.config().clock();
+
+ ResolveStats finalStats;
+ try {
+ stats.started = clock.instant();
+
+ checkState(
+ repositoryLogic(persist).repositoryExists(),
+ "The provided repository has not been initialized.");
+
+ params.relatedObjects().repositoryRelatedObjects().forEach(this::pendingObj);
+
+ var referenceLogic = referenceLogic(persist);
+ var commitLogic = commitLogic(persist);
+
+ for (String internalReferenceName : params.internalReferenceNames()) {
+ var intRef = persist.fetchReference(internalReferenceName);
+ checkState(intRef != null, "Internal reference %s not found!", internalReferenceName);
+ handleReference(intRef);
+ processPendingHeads(commitLogic);
+ }
+
+ for (var referencesIter = referenceLogic.queryReferences(referencesQuery());
+ referencesIter.hasNext(); ) {
+ var reference = referencesIter.next();
+ handleReference(reference);
+ processPendingHeads(commitLogic);
+ }
+
+ processPendingHeads(commitLogic);
+
+ while (!pendingObjs.isEmpty()) {
+ processPendingObjs();
+ }
+ } catch (RuntimeException e) {
+ stats.mustRestart = e instanceof MustRestartWithBiggerFilterRuntimeException;
+ stats.failure = e;
+ throw e;
+ } finally {
+ stats.ended = clock.instant();
+ finalStats = stats.build();
+ }
+ return finalStats;
+ }
+
+ private void processPendingHeads(CommitLogic commitLogic) {
+ while (!pendingHeads.isEmpty()) {
+ var head = pendingHeads.removeFirst();
+ commitLogic.commitLog(commitLogQuery(head)).forEachRemaining(this::handleCommit);
+ }
+ }
+
+ @Override
+ public ResolveStats getStats() {
+ return stats.build();
+ }
+
+ private void handleReference(Reference reference) {
+ stats.numReferences++;
+
+ var persist = referencedObjectsContext.persist();
+
+ if (reference.deleted()) {
+ LOGGER.trace(
+ "Skipping deleted reference {} in repository '{}'",
+ reference.name(),
+ persist.config().repositoryId());
+ return;
+ }
+
+ LOGGER.debug(
+ "Walking reference {} in repository '{}' starting at commit {}",
+ reference.name(),
+ persist.config().repositoryId(),
+ reference.pointer());
+
+ referencedObjectsContext
+ .params()
+ .relatedObjects()
+ .referenceRelatedObjects(reference)
+ .forEach(this::pendingObj);
+
+ commitChain(reference.pointer());
+
+ var extendedInfo = reference.extendedInfoObj();
+ if (extendedInfo != null) {
+ referencedObjectsContext.referencedObjects().markReferenced(extendedInfo);
+ }
+ }
+
+ private void commitChain(ObjId head) {
+ if (EMPTY_OBJ_ID.equals(head)) {
+ // Prevent visiting the same commit more often than once
+ return;
+ }
+
+ stats.numCommitChainHeads++;
+
+ if (referencedObjectsContext.visitedCommitFilter().alreadyVisited(head)) {
+ // Prevent visiting the same commit more often than once
+ return;
+ }
+
+ pendingHeads.addLast(head);
+ }
+
+ private void handleCommit(CommitObj commit) {
+ stats.numCommits++;
+
+ if (!referencedObjectsContext.visitedCommitFilter().mustVisit(commit.id())) {
+ // Prevent visiting the same commit more often than once
+ return;
+ }
+
+ commitRateLimiter.acquire();
+
+ var persist = referencedObjectsContext.persist();
+
+ LOGGER.debug(
+ "Handling commit {} in repository '{}'", commit.id(), persist.config().repositoryId());
+
+ stats.numUniqueCommits++;
+
+ referencedObjectsContext.referencedObjects().markReferenced(commit.id());
+
+ referencedObjectsContext
+ .params()
+ .relatedObjects()
+ .commitRelatedObjects(commit)
+ .forEach(this::pendingObj);
+
+ var indexesLogic = indexesLogic(referencedObjectsContext.persist());
+ var index = indexesLogic.buildCompleteIndexOrEmpty(commit);
+ for (StoreIndexElement indexElement : index) {
+ var content = indexElement.content();
+ if (content.action().exists()) {
+ var value = content.value();
+ pendingObj(value);
+ }
+ }
+
+ commit.secondaryParents().forEach(this::commitChain);
+ }
+
+ private void pendingObj(ObjId objId) {
+ if (recentObjIds.contains(objId)) {
+ return;
+ }
+
+ if (!pendingObjs.add(objId)) {
+ return;
+ }
+
+ stats.numQueuedObjs++;
+
+ if (pendingObjs.size() >= referencedObjectsContext.params().pendingObjsBatchSize()) {
+ processPendingObjs();
+ }
+ }
+
+ private void processPendingObjs() {
+ stats.numQueuedObjsBulkFetches++;
+
+ var persist = referencedObjectsContext.persist();
+
+ LOGGER.debug(
+ "Fetching {} pending objects in repository '{}'",
+ pendingObjs.size(),
+ persist.config().repositoryId());
+
+ var objs = persist.fetchObjsIfExist(pendingObjs.toArray(ObjId[]::new));
+ // Must clear 'pendingObjs' here, because handleObj can add more objects to it
+ pendingObjs.clear();
+
+ for (Obj obj : objs) {
+ if (obj != null) {
+ handleObj(obj);
+ }
+ }
+ }
+
+ private void handleObj(Obj obj) {
+ objRateLimiter.acquire();
+
+ if (!recentObjIds.add(obj.id())) {
+ // already handled
+ return;
+ }
+
+ stats.numObjs++;
+
+ var persist = referencedObjectsContext.persist();
+
+ var objType = obj.type();
+
+ LOGGER.debug(
+ "Handling obj {} of type {}/{} in repository '{}'",
+ obj.id(),
+ objType.name(),
+ objType.shortName(),
+ persist.config().repositoryId());
+
+ referencedObjectsContext.referencedObjects().markReferenced(obj.id());
+
+ if (VALUE.equals(objType)) {
+ var contentValueObj = (ContentValueObj) obj;
+ var content =
+ DefaultStoreWorker.instance()
+ .valueFromStore(contentValueObj.payload(), contentValueObj.data());
+
+ handleContent(content);
+ }
+ }
+
+ private void handleContent(Content content) {
+ stats.numContents++;
+
+ referencedObjectsContext
+ .params()
+ .relatedObjects()
+ .contentRelatedObjects(content)
+ .forEach(this::pendingObj);
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java
new file mode 100644
index 0000000000..120c791c76
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import org.projectnessie.nessie.immutables.NessieImmutable;
+
+@NessieImmutable
+public interface ResolveResult {
+ ResolveStats stats();
+
+ /** Context required for a purge. */
+ PurgeObjectsContext purgeObjectsContext();
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java
new file mode 100644
index 0000000000..dd61216849
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Optional;
+import org.projectnessie.nessie.immutables.NessieImmutable;
+
+@NessieImmutable
+public interface ResolveStats {
+ Instant started();
+
+ Instant ended();
+
+ default Duration duration() {
+ return Duration.between(started(), ended());
+ }
+
+ boolean mustRestart();
+
+ /** Number of processed references, including Nessie internal references. */
+ long numReferences();
+
+ /** Number of commit chain "heads". */
+ long numCommitChainHeads();
+
+ /** Number of processed commit objects, including Nessie internal commits. */
+ long numCommits();
+
+ /** Number of processed unique commit objects, including Nessie internal commits. */
+ long numUniqueCommits();
+
+ /** Number of non-commit objects. */
+ long numObjs();
+
+ /** Number of {@link org.projectnessie.model.Content} objects. */
+ long numContents();
+
+ /** Number of non-commit objects that had been queued for batched commit object handling. */
+ long numQueuedObjs();
+
+ /** Number of bulk non-commit object fetches. */
+ long numQueuedObjsBulkFetches();
+
+ Optional failure();
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java
new file mode 100644
index 0000000000..bc2fef3ada
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java
@@ -0,0 +1,52 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import java.time.Instant;
+import java.util.Optional;
+
+final class ResolveStatsBuilder {
+ Instant started;
+ Instant ended;
+
+ boolean mustRestart;
+ Exception failure;
+
+ long numReferences;
+ long numCommitChainHeads;
+ long numCommits;
+ long numUniqueCommits;
+ long numObjs;
+ long numContents;
+ long numQueuedObjs;
+ long numQueuedObjsBulkFetches;
+
+ ResolveStats build() {
+ return ImmutableResolveStats.of(
+ started,
+ ended,
+ mustRestart,
+ numReferences,
+ numCommitChainHeads,
+ numCommits,
+ numUniqueCommits,
+ numObjs,
+ numContents,
+ numQueuedObjs,
+ numQueuedObjsBulkFetches,
+ Optional.ofNullable(failure));
+ }
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java
new file mode 100644
index 0000000000..5570d8fc1b
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import org.projectnessie.versioned.storage.common.objtypes.CommitObj;
+import org.projectnessie.versioned.storage.common.persist.ObjId;
+
+/**
+ * Filter to prevent processing the same {@linkplain CommitObj Nessie commit} more than once.
+ *
+ * There are two implementations of this interface: {@linkplain #ALLOW_DUPLICATE_TRAVERSALS one}
+ * that does not prevent duplicate processing, and {@linkplain VisitedCommitFilterImpl the
+ * default one} that does. The parameter {@link CleanupParams#allowDuplicateCommitTraversals()} is
+ * used to decide which implementation is being used.
+ */
+public interface VisitedCommitFilter {
+ boolean mustVisit(ObjId commitObjId);
+
+ boolean alreadyVisited(ObjId commitObjId);
+
+ long estimatedHeapPressure();
+
+ VisitedCommitFilter ALLOW_DUPLICATE_TRAVERSALS =
+ new VisitedCommitFilter() {
+ @Override
+ public boolean mustVisit(ObjId commitObjId) {
+ return true;
+ }
+
+ @Override
+ public boolean alreadyVisited(ObjId commitObjId) {
+ return false;
+ }
+
+ @Override
+ public long estimatedHeapPressure() {
+ return 0;
+ }
+ };
+}
diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java
new file mode 100644
index 0000000000..c5efb936aa
--- /dev/null
+++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import static org.agrona.collections.Hashing.DEFAULT_LOAD_FACTOR;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_OBJECT_HASH_SET;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_OBJ_ID;
+import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_PRIMITIVE_OBJ_ARRAY;
+
+import org.agrona.collections.ObjectHashSet;
+import org.projectnessie.versioned.storage.common.persist.ObjId;
+
+final class VisitedCommitFilterImpl implements VisitedCommitFilter {
+ private final ObjectHashSet visited = new ObjectHashSet<>(64, DEFAULT_LOAD_FACTOR);
+
+ @Override
+ public boolean mustVisit(ObjId commitObjId) {
+ return visited.add(commitObjId);
+ }
+
+ @Override
+ public boolean alreadyVisited(ObjId commitObjId) {
+ return visited.contains(commitObjId);
+ }
+
+ @Override
+ public long estimatedHeapPressure() {
+ var sz = visited.size();
+ var cap = visited.capacity();
+ return HEAP_SIZE_OBJECT_HASH_SET + HEAP_SIZE_PRIMITIVE_OBJ_ARRAY * cap + HEAP_SIZE_OBJ_ID * sz;
+ }
+}
diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestCleanup.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestCleanup.java
new file mode 100644
index 0000000000..3821674cf7
--- /dev/null
+++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestCleanup.java
@@ -0,0 +1,516 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.UUID.randomUUID;
+import static org.projectnessie.nessie.relocated.protobuf.ByteString.copyFromUtf8;
+import static org.projectnessie.versioned.storage.cleanup.Cleanup.createCleanup;
+import static org.projectnessie.versioned.storage.common.indexes.StoreKey.key;
+import static org.projectnessie.versioned.storage.common.logic.CreateCommit.Add.commitAdd;
+import static org.projectnessie.versioned.storage.common.logic.CreateCommit.newCommitBuilder;
+import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic;
+import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic;
+import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic;
+import static org.projectnessie.versioned.storage.common.objtypes.CommitHeaders.newCommitHeaders;
+import static org.projectnessie.versioned.storage.common.objtypes.CommitType.NORMAL;
+import static org.projectnessie.versioned.storage.common.objtypes.ContentValueObj.contentValue;
+import static org.projectnessie.versioned.storage.common.objtypes.StringObj.stringData;
+import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID;
+import static org.projectnessie.versioned.testworker.OnRefOnly.onRef;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.projectnessie.nessie.immutables.NessieImmutable;
+import org.projectnessie.versioned.storage.common.logic.CommitLogic;
+import org.projectnessie.versioned.storage.common.logic.InternalRef;
+import org.projectnessie.versioned.storage.common.objtypes.Compression;
+import org.projectnessie.versioned.storage.common.persist.ObjId;
+import org.projectnessie.versioned.storage.common.persist.Persist;
+import org.projectnessie.versioned.storage.testextension.NessiePersist;
+import org.projectnessie.versioned.storage.testextension.PersistExtension;
+import org.projectnessie.versioned.store.DefaultStoreWorker;
+
+@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class})
+public class TestCleanup {
+ @InjectSoftAssertions protected SoftAssertions soft;
+
+ @NessiePersist protected Persist persist;
+
+ @Test
+ void mustRestartWithBiggerFilterThrown() {
+ soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue();
+
+ var maxObjReferenced = persist.config().currentTimeMicros();
+
+ var cleanupParams = CleanupParams.builder().expectedObjCount(1).build();
+ var cleanup = createCleanup(cleanupParams);
+ var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced);
+ var referencedObjectsResolver =
+ cleanup.createReferencedObjectsResolver(referencedObjectsContext);
+
+ soft.assertThatThrownBy(referencedObjectsResolver::resolve)
+ .isInstanceOf(MustRestartWithBiggerFilterException.class);
+
+ var newCleanupParams = cleanupParams.withIncreasedExpectedObjCount();
+
+ soft.assertThat(cleanupParams.expectedObjCount())
+ .isLessThan(newCleanupParams.expectedObjCount());
+ soft.assertThat(
+ CleanupParams.builder()
+ .from(cleanupParams)
+ .expectedObjCount(newCleanupParams.expectedObjCount())
+ .build())
+ .isEqualTo(newCleanupParams);
+
+ cleanup = createCleanup(newCleanupParams);
+ referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced);
+ referencedObjectsResolver = cleanup.createReferencedObjectsResolver(referencedObjectsContext);
+
+ soft.assertThatCode(referencedObjectsResolver::resolve).doesNotThrowAnyException();
+ }
+
+ @Test
+ void estimatedHeapPressure() throws Exception {
+ soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue();
+
+ var maxObjReferenced = persist.config().currentTimeMicros();
+
+ var cleanup = createCleanup(CleanupParams.builder().build());
+ var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced);
+ var referencedObjectsResolver =
+ cleanup.createReferencedObjectsResolver(referencedObjectsContext);
+
+ soft.assertThat(referencedObjectsResolver.estimatedHeapPressure()).isGreaterThan(1L);
+
+ var resolveResult = referencedObjectsResolver.resolve();
+ var purge = cleanup.createPurgeObjects(resolveResult.purgeObjectsContext());
+
+ soft.assertThat(purge.estimatedHeapPressure())
+ .isGreaterThan(1L)
+ .isLessThan(referencedObjectsResolver.estimatedHeapPressure());
+ }
+
+ @Test
+ void againstEmptyRepository() throws Exception {
+ soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue();
+
+ var resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros());
+ soft.assertThat(resolveAndPurge.resolveResult().stats())
+ .extracting(
+ ResolveStats::failure,
+ ResolveStats::numReferences,
+ ResolveStats::numCommitChainHeads,
+ ResolveStats::numCommits,
+ ResolveStats::numUniqueCommits,
+ ResolveStats::numQueuedObjs,
+ ResolveStats::numObjs)
+ .containsExactly(
+ Optional.empty(),
+ // refs
+ 3L,
+ // HEADs ("main" has EMPTY_OBJ_ID)
+ 2L,
+ // commits
+ 3L,
+ // unique commits
+ 3L,
+ // queued objs
+ 2L,
+ // objs
+ 2L);
+ soft.assertThat(resolveAndPurge.purgeResult().stats())
+ .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs)
+ .containsExactly(Optional.empty(), 5L, 0L);
+ }
+
+ @Test
+ void purgeDeleteRefObjs() throws Exception {
+ soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue();
+
+ var referenceLogic = referenceLogic(persist);
+ var commitLogic = commitLogic(persist);
+
+ for (int i = 0; i < 10; i++) {
+ referenceLogic.createReference("kept-" + i, EMPTY_OBJ_ID, null);
+ }
+ for (int i = 0; i < 10; i++) {
+ referenceLogic.createReference("deleted-" + i, EMPTY_OBJ_ID, null);
+ }
+
+ var resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros());
+ soft.assertThat(resolveAndPurge.resolveResult().stats())
+ .extracting(
+ ResolveStats::failure,
+ ResolveStats::numReferences,
+ ResolveStats::numCommitChainHeads,
+ ResolveStats::numCommits,
+ ResolveStats::numUniqueCommits,
+ ResolveStats::numQueuedObjs,
+ ResolveStats::numObjs)
+ .containsExactly(
+ Optional.empty(),
+ // 3 references (empty repo) + 20 created references
+ 3L + 20L,
+ // 2 queued commits (2 internal refs, "main" + all created refs hava EMPTY_OBJ_ID
+ 2L,
+ // 3 commits (empty repo) + 20 created references CommitObjs
+ 3L + 20L,
+ 3L + 20L,
+ // 2 objs (empty repo) + 20 created RefObj's
+ 2L + 20L,
+ 2L + 20L);
+ soft.assertThat(resolveAndPurge.purgeResult().stats())
+ .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs)
+ .containsExactly(
+ Optional.empty(),
+ // 5 (empty repo) + 20 CommitObj + 20 RefObj + 10 CommitObj
+ 5L + 20L + 20L,
+ // Nothing to delete
+ 0L);
+
+ for (int i = 0; i < 10; i++) {
+ referenceLogic.deleteReference("deleted-" + i, EMPTY_OBJ_ID);
+ }
+
+ resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros());
+ soft.assertThat(resolveAndPurge.resolveResult().stats())
+ .extracting(
+ ResolveStats::failure,
+ ResolveStats::numReferences,
+ ResolveStats::numCommitChainHeads,
+ ResolveStats::numCommits,
+ ResolveStats::numUniqueCommits,
+ ResolveStats::numQueuedObjs,
+ ResolveStats::numObjs)
+ .containsExactly(
+ Optional.empty(),
+ // 3 references (empty repo) + 20 created references
+ 3L + 10L,
+ // 2 queued commits (2 internal refs, "main" + all created refs hava EMPTY_OBJ_ID
+ 2L,
+ // 3 commits (empty repo) + 20 created references CommitObjs + 10 deleted references
+ // CommitObjs
+ 3L + 20L + 10L,
+ 3L + 20L + 10L,
+ // 2 objs (empty repo) + 20 created RefObj's
+ 2L + 20L,
+ 2L + 20L);
+ soft.assertThat(resolveAndPurge.purgeResult().stats())
+ .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs)
+ .containsExactly(
+ Optional.empty(),
+ // 5 (empty repo) + 20 CommitObj + 20 RefObj + 10 CommitObj
+ 5L + 20L + 20L + 10L,
+ // RefObj's are NOT deleted, because those are referenced via the `int/refs` commit log
+ // chain
+ 0L);
+
+ // Shorten the "int/refs" history / make RefObj's eligible for cleanup
+
+ var refRefs = requireNonNull(persist.fetchReference(InternalRef.REF_REFS.name()));
+ var newRefRefs = referenceLogic.rewriteCommitLog(refRefs, (num, commit) -> true);
+ soft.assertThat(newRefRefs.pointer()).isNotEqualTo(refRefs.pointer());
+ var refRefsHead = requireNonNull(commitLogic.fetchCommit(newRefRefs.pointer()));
+ soft.assertThat(refRefsHead.directParent()).isEqualTo(EMPTY_OBJ_ID);
+
+ resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros());
+ soft.assertThat(resolveAndPurge.resolveResult().stats())
+ .extracting(
+ ResolveStats::failure,
+ ResolveStats::numReferences,
+ ResolveStats::numCommitChainHeads,
+ ResolveStats::numCommits,
+ ResolveStats::numUniqueCommits,
+ ResolveStats::numQueuedObjs,
+ ResolveStats::numObjs)
+ .containsExactly(
+ Optional.empty(),
+ // 3 references (empty repo) + 20 created references
+ 3L + 10L,
+ // 2 queued commits (2 internal refs, "main" + all created refs hava EMPTY_OBJ_ID
+ 2L,
+ // 2 CommitObjs (one less than "empty repo": the commit to create the "main" reference
+ // has been "squashed")
+ 2L,
+ 2L,
+ // 2 objs (empty repo) + 10 "existing" RefObj's
+ 2L + 10L,
+ 2L + 10L);
+ soft.assertThat(resolveAndPurge.purgeResult().stats())
+ .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs)
+ .containsExactly(
+ Optional.empty(),
+ // 5 (empty repo) + 20 CommitObj + 20 RefObj + 10 CommitObj + 1 re-written CommitObj
+ 5L + 20L + 20L + 10L + 1L,
+ // RefObj's are deleted, because those are referenced via the `int/refs` commit log
+ // chain, CommitObj's from the create/delete reference operations:
+ // 10 RefObj's + 30 CommitObj + 2 CommitObj
+ 10L + 30L + 2L);
+ }
+
+ @Test
+ void againstEmptyRepositoryWithGarbage() throws Exception {
+ soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue();
+
+ var referenceLogic = referenceLogic(persist);
+ var commitLogic = commitLogic(persist);
+
+ var unreferenced = new ArrayList();
+ var keptUnreferenced = new ArrayList();
+ var referencedCommits = new ArrayList();
+ var referenced = new ArrayList();
+ var contents = 0;
+
+ for (int i = 0; i < 25; i++) {
+ var obj =
+ stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i));
+ soft.assertThat(persist.storeObj(obj)).isTrue();
+ unreferenced.add(obj.id());
+ }
+ for (int i = 0; i < 25; i++) {
+ var cid = randomUUID();
+ var obj =
+ contentValue(
+ cid.toString(),
+ 127,
+ DefaultStoreWorker.instance()
+ .toStoreOnReferenceState(onRef("dummy " + i, cid.toString())));
+ soft.assertThat(persist.storeObj(obj)).isTrue();
+ unreferenced.add(obj.id());
+ }
+
+ // 10 new references
+ // 10 new RefObj
+ for (int i = 0; i < 10; i++) {
+ var head = EMPTY_OBJ_ID;
+ for (int i1 = 0; i1 < 20; i1++) {
+ var cid1 = randomUUID();
+ var cid2 = randomUUID();
+ var obj1 =
+ contentValue(
+ cid1.toString(),
+ 127,
+ DefaultStoreWorker.instance()
+ .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 1", cid1.toString())));
+ var obj2 =
+ contentValue(
+ cid2.toString(),
+ 127,
+ DefaultStoreWorker.instance()
+ .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 2", cid2.toString())));
+ var commit =
+ commitLogic.doCommit(
+ newCommitBuilder()
+ .commitType(NORMAL)
+ .parentCommitId(head)
+ .addAdds(
+ commitAdd(
+ key("store", "key", Integer.toString(i), Integer.toString(i1), "1"),
+ 42,
+ obj1.id(),
+ null,
+ cid1))
+ .addAdds(
+ commitAdd(
+ key("store", "key", Integer.toString(i), Integer.toString(i1), "2"),
+ 42,
+ obj2.id(),
+ null,
+ cid2))
+ .headers(newCommitHeaders().add("created", "foo-" + i + "-" + i1).build())
+ .message("commit " + i1 + " on " + i)
+ .build(),
+ List.of(obj1, obj2));
+ head = requireNonNull(commit).id();
+
+ referencedCommits.add(head);
+ referenced.add(obj1.id());
+ referenced.add(obj2.id());
+ contents += 2;
+ }
+
+ var extendedInfo =
+ stringData("ref/foo", Compression.NONE, null, List.of(), copyFromUtf8("ext-info " + i));
+ soft.assertThat(persist.storeObj(extendedInfo)).isTrue();
+ referenced.add(extendedInfo.id());
+
+ referenceLogic.createReference("refs/heads/myref-" + i, head, extendedInfo.id());
+ }
+
+ var maxObjReferenced = persist.config().currentTimeMicros();
+
+ // Unreferenced, but newer than 'maxObjReferenced'
+ for (int i = 100; i < 125; i++) {
+ var obj =
+ stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i));
+ soft.assertThat(persist.storeObj(obj)).isTrue();
+ keptUnreferenced.add(obj.id());
+ }
+ for (int i = 100; i < 125; i++) {
+ var obj = contentValue("cid-" + i, 42, copyFromUtf8("string " + i));
+ soft.assertThat(persist.storeObj(obj)).isTrue();
+ keptUnreferenced.add(obj.id());
+ }
+
+ var resolveAndPurge = resolveAndPurge(maxObjReferenced);
+
+ soft.assertThat(resolveAndPurge.resolveResult().stats())
+ .extracting(
+ ResolveStats::failure,
+ ResolveStats::numReferences,
+ ResolveStats::numCommitChainHeads,
+ ResolveStats::numCommits,
+ ResolveStats::numUniqueCommits,
+ ResolveStats::numQueuedObjs,
+ ResolveStats::numObjs)
+ .containsExactly(
+ Optional.empty(),
+ // refs
+ 3L + 10L,
+ // heads ("main" has EMPTY_OBJ_ID)
+ 2L + 10L,
+ // commits
+ 3L + 10L + referencedCommits.size(),
+ // unique commits
+ 3L + 10L + referencedCommits.size(),
+ // objects + non-existing UniqueObj
+ 2L + referenced.size() + contents,
+ 2L + referenced.size());
+
+ soft.assertThat(resolveAndPurge.purgeResult().stats())
+ .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs)
+ .containsExactly(
+ Optional.empty(), 5L + 100L + 20L + referencedCommits.size() + referenced.size(), 50L);
+
+ soft.assertThat(persist.fetchObjsIfExist(unreferenced.toArray(new ObjId[0])))
+ .containsOnlyNulls();
+ soft.assertThat(persist.fetchObjsIfExist(keptUnreferenced.toArray(new ObjId[0])))
+ .doesNotContainNull();
+ soft.assertThat(persist.fetchObjsIfExist(referenced.toArray(new ObjId[0])))
+ .doesNotContainNull();
+ }
+
+ @Test
+ void withSecondaryParents() throws Exception {
+ soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue();
+
+ var referenceLogic = referenceLogic(persist);
+ var commitLogic = commitLogic(persist);
+
+ var secondaryHead = buildNewCommitChain(commitLogic, "secondary");
+ var referenceHead = buildNewCommitChain(commitLogic, "main");
+
+ var mergeCommit =
+ commitLogic.doCommit(
+ newCommitBuilder()
+ .commitType(NORMAL)
+ .parentCommitId(referenceHead)
+ .addSecondaryParents(secondaryHead)
+ .message("merge commit")
+ .headers(newCommitHeaders().add("created", "foo merge").build())
+ .build(),
+ List.of());
+
+ referenceLogic.createReference("refs/heads/my-merge-1", requireNonNull(mergeCommit).id(), null);
+ referenceLogic.createReference("refs/heads/my-merge-2", requireNonNull(mergeCommit).id(), null);
+
+ var maxObjReferenced = persist.config().currentTimeMicros();
+ var resolveAndPurge = resolveAndPurge(maxObjReferenced);
+
+ soft.assertThat(resolveAndPurge.resolveResult().stats())
+ .extracting(
+ ResolveStats::failure,
+ ResolveStats::numReferences,
+ ResolveStats::numCommitChainHeads,
+ ResolveStats::numCommits,
+ ResolveStats::numUniqueCommits,
+ ResolveStats::numQueuedObjs,
+ ResolveStats::numObjs)
+ .containsExactly(
+ Optional.empty(),
+ // references
+ 3L + 1L + 1L,
+ // commit heads (all refs HEADs + secondary parent + incl duplicates & EMPTY_OBJ_ID)
+ 3L + 2L,
+ // commits (internals + 2x create-ref + 5+5 + 1)
+ 3L + 2L + 5L + 5L + 1L,
+ 3L + 2L + 5L + 5L + 1L,
+ // objects (internals, 2x RefObj + 5+5 contents + 10 non-existing UniqueObj
+ 2L + 2L + 5L + 5L + 10L,
+ 2L + 2L + 5L + 5L);
+
+ soft.assertThat(resolveAndPurge.purgeResult().stats())
+ .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs)
+ .containsExactly(Optional.empty(), 5L + 13L + 12L, 0L);
+ }
+
+ private ObjId buildNewCommitChain(CommitLogic commitLogic, String discrim) throws Exception {
+ var head = EMPTY_OBJ_ID;
+ for (int i = 0; i < 5; i++) {
+ var cid1 = randomUUID();
+ var obj1 =
+ contentValue(
+ cid1.toString(),
+ 127,
+ DefaultStoreWorker.instance()
+ .toStoreOnReferenceState(onRef("obj " + i + " " + discrim, cid1.toString())));
+ var commit =
+ commitLogic.doCommit(
+ newCommitBuilder()
+ .commitType(NORMAL)
+ .parentCommitId(head)
+ .addAdds(
+ commitAdd(
+ key("store", "key", Integer.toString(i), discrim),
+ 42,
+ obj1.id(),
+ null,
+ cid1))
+ .headers(newCommitHeaders().add("created", "foo-" + i + "-" + discrim).build())
+ .message("commit " + i + " " + discrim)
+ .build(),
+ List.of(obj1));
+ head = requireNonNull(commit).id();
+ }
+ return head;
+ }
+
+ ResolvePurgeResult resolveAndPurge(long maxObjReferenced) throws Exception {
+ var cleanup = createCleanup(CleanupParams.builder().build());
+ var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced);
+ var referencedObjectsResolver =
+ cleanup.createReferencedObjectsResolver(referencedObjectsContext);
+ var resolveResult = referencedObjectsResolver.resolve();
+ var purgeObjects = cleanup.createPurgeObjects(resolveResult.purgeObjectsContext());
+ var purgeResult = purgeObjects.purge();
+
+ return ImmutableResolvePurgeResult.of(resolveResult, purgeResult);
+ }
+
+ @NessieImmutable
+ interface ResolvePurgeResult {
+ ResolveResult resolveResult();
+
+ PurgeResult purgeResult();
+ }
+}
diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestPurgeStatsBuilder.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestPurgeStatsBuilder.java
new file mode 100644
index 0000000000..179cd07052
--- /dev/null
+++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestPurgeStatsBuilder.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(SoftAssertionsExtension.class)
+public class TestPurgeStatsBuilder {
+ @InjectSoftAssertions SoftAssertions soft;
+
+ @Test
+ void purgeStatsBuilder() {
+ var builder = new PurgeStatsBuilder();
+
+ var expected = ImmutablePurgeStats.builder();
+
+ builder.started = Instant.EPOCH;
+ expected.started(Instant.EPOCH);
+ builder.ended = Instant.EPOCH.plus(42, ChronoUnit.DAYS);
+ expected.ended(Instant.EPOCH.plus(42, ChronoUnit.DAYS));
+ builder.numPurgedObjs = 1;
+ expected.numPurgedObjs(1);
+ builder.numScannedObjs = 2;
+ expected.numScannedObjs(2);
+ builder.failure = new Exception("hello");
+ expected.failure(builder.failure);
+
+ soft.assertThat(builder.build()).isEqualTo(expected.build());
+ }
+}
diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestRateLimiting.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestRateLimiting.java
new file mode 100644
index 0000000000..1d31a14426
--- /dev/null
+++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestRateLimiting.java
@@ -0,0 +1,240 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.UUID.randomUUID;
+import static org.projectnessie.nessie.relocated.protobuf.ByteString.copyFromUtf8;
+import static org.projectnessie.versioned.storage.cleanup.Cleanup.createCleanup;
+import static org.projectnessie.versioned.storage.common.indexes.StoreKey.key;
+import static org.projectnessie.versioned.storage.common.logic.CreateCommit.Add.commitAdd;
+import static org.projectnessie.versioned.storage.common.logic.CreateCommit.newCommitBuilder;
+import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic;
+import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic;
+import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic;
+import static org.projectnessie.versioned.storage.common.objtypes.CommitHeaders.newCommitHeaders;
+import static org.projectnessie.versioned.storage.common.objtypes.CommitType.NORMAL;
+import static org.projectnessie.versioned.storage.common.objtypes.ContentValueObj.contentValue;
+import static org.projectnessie.versioned.storage.common.objtypes.StringObj.stringData;
+import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID;
+import static org.projectnessie.versioned.testworker.OnRefOnly.onRef;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.projectnessie.versioned.storage.common.objtypes.Compression;
+import org.projectnessie.versioned.storage.common.persist.ObjId;
+import org.projectnessie.versioned.storage.common.persist.Persist;
+import org.projectnessie.versioned.storage.testextension.NessiePersist;
+import org.projectnessie.versioned.storage.testextension.PersistExtension;
+import org.projectnessie.versioned.store.DefaultStoreWorker;
+
+@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class})
+public class TestRateLimiting {
+ @InjectSoftAssertions protected SoftAssertions soft;
+
+ @NessiePersist protected Persist persist;
+
+ @Test
+ void productionImplementations() {
+ soft.assertThat(RateLimit.create(0)).extracting(RateLimit::toString).isEqualTo("unlimited");
+ soft.assertThat(RateLimit.create(-42)).extracting(RateLimit::toString).isEqualTo("unlimited");
+ soft.assertThat(RateLimit.create(42)).extracting(RateLimit::toString).isEqualTo("up to 42");
+
+ soft.assertThatCode(() -> RateLimit.create(0).acquire()).doesNotThrowAnyException();
+ soft.assertThatCode(() -> RateLimit.create(42).acquire()).doesNotThrowAnyException();
+ }
+
+ @Test
+ void againstEmptyRepositoryWithGarbage() throws Exception {
+ soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue();
+
+ var referenceLogic = referenceLogic(persist);
+ var commitLogic = commitLogic(persist);
+
+ var unreferenced = new ArrayList();
+ var keptUnreferenced = new ArrayList();
+ var referencedCommits = new ArrayList();
+ var referenced = new ArrayList();
+ var contents = 0;
+
+ for (int i = 0; i < 25; i++) {
+ var obj =
+ stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i));
+ soft.assertThat(persist.storeObj(obj)).isTrue();
+ unreferenced.add(obj.id());
+ }
+ for (int i = 0; i < 25; i++) {
+ var cid = randomUUID();
+ var obj =
+ contentValue(
+ cid.toString(),
+ 127,
+ DefaultStoreWorker.instance()
+ .toStoreOnReferenceState(onRef("dummy " + i, cid.toString())));
+ soft.assertThat(persist.storeObj(obj)).isTrue();
+ unreferenced.add(obj.id());
+ }
+
+ // 10 new references
+ // 10 new RefObj
+ for (int i = 0; i < 10; i++) {
+ var head = EMPTY_OBJ_ID;
+ for (int i1 = 0; i1 < 20; i1++) {
+ var cid1 = randomUUID();
+ var cid2 = randomUUID();
+ var obj1 =
+ contentValue(
+ cid1.toString(),
+ 127,
+ DefaultStoreWorker.instance()
+ .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 1", cid1.toString())));
+ var obj2 =
+ contentValue(
+ cid2.toString(),
+ 127,
+ DefaultStoreWorker.instance()
+ .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 2", cid2.toString())));
+ var commit =
+ commitLogic.doCommit(
+ newCommitBuilder()
+ .commitType(NORMAL)
+ .parentCommitId(head)
+ .addAdds(
+ commitAdd(
+ key("store", "key", Integer.toString(i), Integer.toString(i1), "1"),
+ 42,
+ obj1.id(),
+ null,
+ cid1))
+ .addAdds(
+ commitAdd(
+ key("store", "key", Integer.toString(i), Integer.toString(i1), "2"),
+ 42,
+ obj2.id(),
+ null,
+ cid2))
+ .headers(newCommitHeaders().add("created", "foo-" + i + "-" + i1).build())
+ .message("commit " + i1 + " on " + i)
+ .build(),
+ List.of(obj1, obj2));
+ head = requireNonNull(commit).id();
+
+ referencedCommits.add(head);
+ referenced.add(obj1.id());
+ referenced.add(obj2.id());
+ contents += 2;
+ }
+
+ var extendedInfo =
+ stringData("ref/foo", Compression.NONE, null, List.of(), copyFromUtf8("ext-info " + i));
+ soft.assertThat(persist.storeObj(extendedInfo)).isTrue();
+ referenced.add(extendedInfo.id());
+
+ referenceLogic.createReference("refs/heads/myref-" + i, head, extendedInfo.id());
+ }
+
+ var maxObjReferenced = persist.config().currentTimeMicros();
+
+ // Unreferenced, but newer than 'maxObjReferenced'
+ for (int i = 100; i < 125; i++) {
+ var obj =
+ stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i));
+ soft.assertThat(persist.storeObj(obj)).isTrue();
+ keptUnreferenced.add(obj.id());
+ }
+ for (int i = 100; i < 125; i++) {
+ var obj = contentValue("cid-" + i, 42, copyFromUtf8("string " + i));
+ soft.assertThat(persist.storeObj(obj)).isTrue();
+ keptUnreferenced.add(obj.id());
+ }
+
+ var acquires = new AtomicLong[4];
+
+ var cleanup =
+ createCleanup(
+ CleanupParams.builder()
+ .resolveCommitRatePerSecond(1)
+ .resolveObjRatePerSecond(2)
+ .purgeScanObjRatePerSecond(3)
+ .purgeDeleteObjRatePerSecond(4)
+ .rateLimitFactory(
+ i -> {
+ var a = acquires[i - 1] = new AtomicLong();
+ return a::incrementAndGet;
+ })
+ .build());
+ var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced);
+ var referencedObjectsResolver =
+ cleanup.createReferencedObjectsResolver(referencedObjectsContext);
+ var resolveResult = referencedObjectsResolver.resolve();
+
+ soft.assertThat(acquires)
+ .extracting(l -> l != null ? l.get() : -1L)
+ .containsExactlyInAnyOrder(
+ 3L + 10L + referencedCommits.size(), 2L + referenced.size(), -1L, -1L);
+ soft.assertThat(resolveResult.stats())
+ .extracting(
+ ResolveStats::failure,
+ ResolveStats::numReferences,
+ ResolveStats::numCommitChainHeads,
+ ResolveStats::numCommits,
+ ResolveStats::numUniqueCommits,
+ ResolveStats::numQueuedObjs,
+ ResolveStats::numObjs)
+ .containsExactly(
+ Optional.empty(),
+ // refs
+ 3L + 10L,
+ // heads ("main" has EMPTY_OBJ_ID)
+ 2L + 10L,
+ // commits
+ 3L + 10L + referencedCommits.size(),
+ // unique commits
+ 3L + 10L + referencedCommits.size(),
+ // objects
+ 2L + referenced.size() + contents,
+ 2L + referenced.size());
+
+ var purgeObjects = cleanup.createPurgeObjects(resolveResult.purgeObjectsContext());
+ var purgeResult = purgeObjects.purge();
+
+ soft.assertThat(acquires)
+ .extracting(AtomicLong::get)
+ .containsExactlyInAnyOrder(
+ 3L + 10L + referencedCommits.size(),
+ 2L + referenced.size(),
+ 5L + 100L + 20L + referencedCommits.size() + referenced.size(),
+ 50L);
+ soft.assertThat(purgeResult.stats())
+ .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs)
+ .containsExactly(
+ Optional.empty(), 5L + 100L + 20L + referencedCommits.size() + referenced.size(), 50L);
+
+ soft.assertThat(persist.fetchObjsIfExist(unreferenced.toArray(new ObjId[0])))
+ .containsOnlyNulls();
+ soft.assertThat(persist.fetchObjsIfExist(keptUnreferenced.toArray(new ObjId[0])))
+ .doesNotContainNull();
+ soft.assertThat(persist.fetchObjsIfExist(referenced.toArray(new ObjId[0])))
+ .doesNotContainNull();
+ }
+}
diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java
new file mode 100644
index 0000000000..1a8388ab82
--- /dev/null
+++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java
@@ -0,0 +1,110 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromByteArray;
+import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.projectnessie.versioned.storage.common.persist.ObjId;
+
+@ExtendWith(SoftAssertionsExtension.class)
+public class TestReferencedObjectsFilterImpl {
+ @InjectSoftAssertions SoftAssertions soft;
+
+ @Test
+ public void emptyFilterContainsNothing() {
+ ReferencedObjectsFilterImpl filter =
+ new ReferencedObjectsFilterImpl(CleanupParams.builder().build());
+ soft.assertThat(filter.isProbablyReferenced(ObjId.EMPTY_OBJ_ID)).isFalse();
+ for (int i = 0; i < 100; i++) {
+ ObjId id = randomObjId();
+ soft.assertThat(filter.isProbablyReferenced(id)).describedAs("id = %s", id).isFalse();
+ }
+ }
+
+ @Test
+ public void filterContainsAdded() {
+ ReferencedObjectsFilterImpl filter =
+ new ReferencedObjectsFilterImpl(CleanupParams.builder().build());
+
+ soft.assertThat(filter.estimatedHeapPressure()).isGreaterThan(1L);
+
+ soft.assertThat(filter.markReferenced(ObjId.EMPTY_OBJ_ID)).isTrue();
+ soft.assertThat(filter.markReferenced(ObjId.EMPTY_OBJ_ID)).isFalse();
+
+ Set ids = new HashSet<>(3000);
+ for (int i = 0; i < 1000; i++) {
+ ids.add(randomObjId());
+ }
+
+ for (int i = 0; i < 1000; i++) {
+ byte[] bytes = new byte[4 + ThreadLocalRandom.current().nextInt(33)];
+ ThreadLocalRandom.current().nextBytes(bytes);
+ ids.add(objIdFromByteArray(bytes));
+ }
+
+ for (ObjId id : ids) {
+ // There is a theoretical chance that this assertion fails, but that change is extremely low.
+ // (We're adding 2000 object IDs to a bloom filter with an expected object count of 1M and a
+ // low FPP.)
+ soft.assertThat(filter.markReferenced(id)).isTrue();
+ soft.assertThat(filter.markReferenced(id)).isFalse();
+ }
+
+ soft.assertThat(filter.isProbablyReferenced(ObjId.EMPTY_OBJ_ID)).isTrue();
+ for (ObjId id : ids) {
+ soft.assertThat(filter.isProbablyReferenced(id)).describedAs("id = %s", id).isTrue();
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {100, 1_000, 10_000})
+ public void withinExpectedFpp(int expected) {
+ ReferencedObjectsFilterImpl filter =
+ new ReferencedObjectsFilterImpl(CleanupParams.builder().expectedObjCount(expected).build());
+
+ for (int i = 0; i < expected; i++) {
+ ObjId id = randomObjId();
+ soft.assertThatCode(() -> filter.markReferenced(id)).doesNotThrowAnyException();
+ soft.assertThat(filter.withinExpectedFpp()).isTrue();
+ }
+
+ // "withinExpectedFpp" should trigger at some point
+ boolean thrown = false;
+ for (int i = 0; i < expected / 2; i++) {
+ ObjId id = randomObjId();
+ try {
+ filter.markReferenced(id);
+ soft.assertThat(filter.withinExpectedFpp()).isTrue();
+ } catch (MustRestartWithBiggerFilterRuntimeException e) {
+ soft.assertThat(filter.withinExpectedFpp()).isFalse();
+ thrown = true;
+ break;
+ }
+ }
+ soft.assertThat(thrown).isTrue();
+ }
+}
diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestResolveStatsBuilder.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestResolveStatsBuilder.java
new file mode 100644
index 0000000000..3e50f0af62
--- /dev/null
+++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestResolveStatsBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Copyright (C) 2024 Dremio
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.projectnessie.versioned.storage.cleanup;
+
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import org.assertj.core.api.SoftAssertions;
+import org.assertj.core.api.junit.jupiter.InjectSoftAssertions;
+import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(SoftAssertionsExtension.class)
+public class TestResolveStatsBuilder {
+ @InjectSoftAssertions SoftAssertions soft;
+
+ @Test
+ void resolveStatsBuilder() {
+ var builder = new ResolveStatsBuilder();
+
+ var expected = ImmutableResolveStats.builder();
+
+ builder.mustRestart = true;
+ expected.mustRestart(true);
+ builder.started = Instant.EPOCH;
+ expected.started(Instant.EPOCH);
+ builder.ended = Instant.EPOCH.plus(42, ChronoUnit.DAYS);
+ expected.ended(Instant.EPOCH.plus(42, ChronoUnit.DAYS));
+ builder.numCommits = 1;
+ expected.numCommits(1);
+ builder.numContents = 2;
+ expected.numContents(2);
+ builder.numObjs = 3;
+ expected.numObjs(3);
+ builder.numReferences = 4;
+ expected.numReferences(4);
+ builder.numUniqueCommits = 5;
+ expected.numUniqueCommits(5);
+ builder.numCommitChainHeads = 7;
+ expected.numCommitChainHeads(7);
+ builder.numQueuedObjs = 8;
+ expected.numQueuedObjs(8);
+ builder.numQueuedObjsBulkFetches = 9;
+ expected.numQueuedObjsBulkFetches(9);
+ builder.failure = new Exception("hello");
+ expected.failure(builder.failure);
+
+ soft.assertThat(builder.build()).isEqualTo(expected.build());
+ }
+}