From 4453b29af399637e1d7306b7e949289c58d56956 Mon Sep 17 00:00:00 2001 From: Robert Stupp Date: Wed, 2 Oct 2024 14:10:59 +0200 Subject: [PATCH] Persistence: purge unreferenced `Obj`s MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This is an attempt to implement the algorithm mentioned in the PR #9401. The `Obj.referenced()` attribute contains the timestamp when the object was last "referenced" (aka: attempted to be written). It is ... * set when an object is first persisted via a `storeObj()` * updated in the database, when an object was not persisted via `storeObj()` * set/updated via `upsertObj()` * updated via `updateConditional()` Let's assume that there is a mechanism to identify the IDs of all referenced objects (it would be very similar to what the export functionality does). The algorithm to purge unreferenced objects must never delete an object that is referenced at any point of time, and must consider the case that an object that was unreferenced when a purge-unreferenced-objects routine started, but became referenced while it is running. An approach could work as follows: 1. Memoize the current timestamp (minus some wall-clock drift adjustment). 2. Identify the IDs of all referenced objects. We could leverage a bloom filter, if the set of IDs is big. 3. Then scan all objects in the repository. Objects can be purged, if ...     * the ID is not in the set (or bloom filter) generated in step 2 ...     * _AND_ have a `referenced` timestamp less than the memoized timestamp. Any deletion in the backing database would follow the meaning of this pseudo SQL: `DELETE FROM objs WHERE obj_id = :objId AND referenced < :memoizedTimestamp`. Noting, that the `referenced` attribute is rather incorrect when retrieved from the objects cache (aka: during normal operations), which is not a problem, because that `referenced` attribute is irrelevant for production accesses. There are two edge cases / race conditions: * (for some backends): A `storeObj()` operation detected that the object already exists - then the purge routine deletes that object - and then the `storeObj()` tries to upddate the `referenced` attribute. The result is the loss of that object. This race condition can only occur, if the object existed but was not referenced. * While the referenced objects are being identified, create a new named reference (branch / tag) pointing to commit(s) that would be identified as unreferenced and being later purged. --- bom/build.gradle.kts | 1 + gradle/projects.main.properties | 1 + versioned/storage/cleanup/build.gradle.kts | 61 +++ .../versioned/storage/cleanup/Cleanup.java | 111 ++++ .../storage/cleanup/CleanupParams.java | 195 +++++++ .../versioned/storage/cleanup/HeapSizes.java | 162 ++++++ .../MustRestartWithBiggerFilterException.java | 27 + ...startWithBiggerFilterRuntimeException.java | 26 + .../storage/cleanup/PurgeFilter.java | 71 +++ .../storage/cleanup/PurgeObjects.java | 31 ++ .../storage/cleanup/PurgeObjectsContext.java | 58 ++ .../storage/cleanup/PurgeObjectsImpl.java | 139 +++++ .../storage/cleanup/PurgeResult.java | 23 + .../versioned/storage/cleanup/PurgeStats.java | 43 ++ .../storage/cleanup/PurgeStatsBuilder.java | 34 ++ .../versioned/storage/cleanup/RateLimit.java | 50 ++ .../storage/cleanup/RecentObjIdFilter.java | 27 + .../cleanup/RecentObjIdFilterImpl.java | 73 +++ .../cleanup/ReferencedObjectsContext.java | 59 ++ .../cleanup/ReferencedObjectsFilter.java | 41 ++ .../cleanup/ReferencedObjectsFilterImpl.java | 117 ++++ .../cleanup/ReferencedObjectsResolver.java | 45 ++ .../ReferencedObjectsResolverImpl.java | 369 +++++++++++++ .../storage/cleanup/ResolveResult.java | 26 + .../storage/cleanup/ResolveStats.java | 60 ++ .../storage/cleanup/ResolveStatsBuilder.java | 52 ++ .../storage/cleanup/VisitedCommitFilter.java | 53 ++ .../cleanup/VisitedCommitFilterImpl.java | 45 ++ .../storage/cleanup/TestCleanup.java | 516 ++++++++++++++++++ .../cleanup/TestPurgeStatsBuilder.java | 49 ++ .../storage/cleanup/TestRateLimiting.java | 240 ++++++++ .../TestReferencedObjectsFilterImpl.java | 110 ++++ .../cleanup/TestResolveStatsBuilder.java | 63 +++ 33 files changed, 2978 insertions(+) create mode 100644 versioned/storage/cleanup/build.gradle.kts create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/HeapSizes.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterRuntimeException.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilter.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilterImpl.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java create mode 100644 versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java create mode 100644 versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestCleanup.java create mode 100644 versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestPurgeStatsBuilder.java create mode 100644 versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestRateLimiting.java create mode 100644 versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java create mode 100644 versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestResolveStatsBuilder.java diff --git a/bom/build.gradle.kts b/bom/build.gradle.kts index b068c557f4..d7cb70f8dd 100644 --- a/bom/build.gradle.kts +++ b/bom/build.gradle.kts @@ -88,6 +88,7 @@ dependencies { api(project(":nessie-versioned-storage-cassandra-tests")) api(project(":nessie-versioned-storage-cassandra2")) api(project(":nessie-versioned-storage-cassandra2-tests")) + api(project(":nessie-versioned-storage-cleanup")) api(project(":nessie-versioned-storage-common")) api(project(":nessie-versioned-storage-common-proto")) api(project(":nessie-versioned-storage-common-serialize")) diff --git a/gradle/projects.main.properties b/gradle/projects.main.properties index 6abd8850df..d9051f5880 100644 --- a/gradle/projects.main.properties +++ b/gradle/projects.main.properties @@ -71,6 +71,7 @@ nessie-versioned-storage-cassandra=versioned/storage/cassandra nessie-versioned-storage-cassandra-tests=versioned/storage/cassandra-tests nessie-versioned-storage-cassandra2=versioned/storage/cassandra2 nessie-versioned-storage-cassandra2-tests=versioned/storage/cassandra2-tests +nessie-versioned-storage-cleanup=versioned/storage/cleanup nessie-versioned-storage-common=versioned/storage/common nessie-versioned-storage-common-proto=versioned/storage/common-proto nessie-versioned-storage-common-serialize=versioned/storage/common-serialize diff --git a/versioned/storage/cleanup/build.gradle.kts b/versioned/storage/cleanup/build.gradle.kts new file mode 100644 index 0000000000..fa457c1872 --- /dev/null +++ b/versioned/storage/cleanup/build.gradle.kts @@ -0,0 +1,61 @@ +/* + * Copyright (C) 2022 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +plugins { id("nessie-conventions-server") } + +publishingHelper { mavenName = "Nessie - Storage - Cleanup unreferenced objects" } + +description = "Identify and purge unreferenced objects in the Nessie repository." + +dependencies { + implementation(project(":nessie-model")) + implementation(project(":nessie-versioned-storage-common")) + implementation(project(":nessie-versioned-spi")) + implementation(project(":nessie-versioned-transfer-related")) + + compileOnly(libs.jakarta.validation.api) + compileOnly(libs.jakarta.annotation.api) + compileOnly(libs.microprofile.openapi) + + compileOnly(platform(libs.jackson.bom)) + compileOnly("com.fasterxml.jackson.core:jackson-annotations") + + compileOnly(libs.errorprone.annotations) + implementation(libs.guava) + implementation(libs.agrona) + implementation(libs.slf4j.api) + + compileOnly(project(":nessie-versioned-storage-testextension")) + + compileOnly(project(":nessie-immutables")) + annotationProcessor(project(":nessie-immutables", configuration = "processor")) + + testImplementation(project(":nessie-versioned-storage-testextension")) + testImplementation(project(":nessie-versioned-storage-inmemory")) + testImplementation(project(":nessie-versioned-tests")) + testImplementation(project(path = ":nessie-protobuf-relocated", configuration = "shadow")) + testImplementation(platform(libs.junit.bom)) + testImplementation(libs.bundles.junit.testing) + testRuntimeOnly(libs.logback.classic) + + testCompileOnly(project(":nessie-immutables")) + testAnnotationProcessor(project(":nessie-immutables", configuration = "processor")) + + testCompileOnly(libs.microprofile.openapi) + + testCompileOnly(platform(libs.jackson.bom)) + testCompileOnly("com.fasterxml.jackson.core:jackson-annotations") +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java new file mode 100644 index 0000000000..57f1bf1cfc --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/Cleanup.java @@ -0,0 +1,111 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.cleanup.PurgeFilter.ReferencedObjectsPurgeFilter.referencedObjectsPurgeFilter; +import static org.projectnessie.versioned.storage.cleanup.ReferencedObjectsContext.objectsResolverContext; + +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.Persist; + +/** + * Primary point of entry to remove unreferenced objects from Nessie's backend database. + * + *

Simplified example code flow:

+ * var params =
+ *   CleanupParams.builder().build();
+ * var cleanup =
+ *   createCleanup(params);
+ *
+ * var referencedObjectsContext =
+ *   cleanup.buildReferencedObjectsContext(persist,
+ *   TimeUnit.MILLISECONDS.toMicros(
+ *     Instant.now().minus(3, ChronoUnit.DAYS)
+ *       .toEpochMilli()));
+ * var referencedObjectsResolver =
+ *   cleanup.createReferencedObjectsResolver(referencedObjectsContext);
+ *
+ * // Must handle MustRestartWithBiggerFilterException
+ * var resolveResult =
+ *   referencedObjectsResolver.resolve();
+ *
+ * var purgeObjects =
+ *   cleanup.createPurgeObjects(resolveResult.purgeObjectsContext());
+ * var purgeResult =
+ *   purgeObjects.purge();
+ * 
+ */ +public class Cleanup { + private final CleanupParams cleanupParams; + + private Cleanup(CleanupParams cleanupParams) { + this.cleanupParams = cleanupParams; + } + + public static Cleanup createCleanup(CleanupParams params) { + return new Cleanup(params); + } + + /** + * Create the context holder used when identifying referenced objects and purging unreferenced + * objects. + * + *

Choosing an appropriate value for {@code maxObjReferenced} is crucial. Technically, this + * value must be at max the current timestamp - but logically {@code maxObjReferenced} should be + * the timestamp of a few days ago to not delete unreferenced objects too early and give users a + * chance to reset branches to another commit ID in case some table/view metadata is broken. + * + *

Uses an instance of {@link + * org.projectnessie.versioned.storage.cleanup.PurgeFilter.ReferencedObjectsPurgeFilter} using a + * bloom filter based {@link ReferencedObjectsFilter}, both configured using {@link + * CleanupParams}'s attributes. + * + * @param persist the persistence/repository to run against + * @param maxObjReferenced only {@link Obj}s with a {@link Obj#referenced()} older than {@code + * maxObjReferenced} will be deleted. Production workloads should set this to something like + * "now minus 7 days" to have the chance to reset branches, just in case. Technically, this + * value must not be greater than "now". "Now" should be inquired using {@code + * Persist.config().clock().instant()}. + */ + public ReferencedObjectsContext buildReferencedObjectsContext( + Persist persist, long maxObjReferenced) { + var referencedObjects = new ReferencedObjectsFilterImpl(cleanupParams); + var purgeFilter = referencedObjectsPurgeFilter(referencedObjects, maxObjReferenced); + return objectsResolverContext(persist, cleanupParams, referencedObjects, purgeFilter); + } + + /** + * Creates a new objects-resolver instance to identify referenced objects, which must be + * retained. + * + * @param objectsResolverContext context, preferably created using {@link + * #buildReferencedObjectsContext(Persist, long)} + */ + public ReferencedObjectsResolver createReferencedObjectsResolver( + ReferencedObjectsContext objectsResolverContext) { + return new ReferencedObjectsResolverImpl( + objectsResolverContext, cleanupParams.rateLimitFactory()); + } + + /** + * Creates a new objects-purger instance to delete unreferenced objects. + * + * @param purgeObjectsContext return value of {@link ReferencedObjectsResolver#resolve()}. + */ + public PurgeObjects createPurgeObjects(PurgeObjectsContext purgeObjectsContext) { + return new PurgeObjectsImpl(purgeObjectsContext, cleanupParams.rateLimitFactory()); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java new file mode 100644 index 0000000000..1814259ef6 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/CleanupParams.java @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.common.logic.InternalRef.REF_REFS; +import static org.projectnessie.versioned.storage.common.logic.InternalRef.REF_REPO; +import static org.projectnessie.versioned.transfer.related.CompositeTransferRelatedObjects.createCompositeTransferRelatedObjects; + +import java.util.List; +import java.util.function.IntFunction; +import org.immutables.value.Value; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.objtypes.CommitObj; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.transfer.related.TransferRelatedObjects; + +/** + * Technically and implementation oriented parameters for Nessie's backend database cleanup, + * considered for internal use only. + * + *

Any API or functionality that exposes Nessie's backend database cleanup must provide a + * functionally oriented way for configuration and generate a {@link CleanupParams} from it. + */ +@NessieImmutable +public interface CleanupParams { + // Following defaults result in a serialized bloom filter size of about 3000000 bytes. + long DEFAULT_EXPECTED_OBJ_COUNT = 1_000_000L; + double DEFAULT_FALSE_POSITIVE_PROBABILITY = 0.00001d; + double DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY = 0.0001d; + boolean DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS = false; + int DEFAULT_PENDING_OBJS_BATCH_SIZE = 20; + int DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE = 100_000; + + static ImmutableCleanupParams.Builder builder() { + return ImmutableCleanupParams.builder(); + } + + /** + * Number of expected {@link Obj}s, defaults to {@value #DEFAULT_EXPECTED_OBJ_COUNT}, used to size + * the bloom filter identifying the referenced {@link Obj}s. If {@link + * ReferencedObjectsResolver#resolve()} throws {@link MustRestartWithBiggerFilterException}, it is + * recommended to increase this value. + */ + @Value.Default + default long expectedObjCount() { + return DEFAULT_EXPECTED_OBJ_COUNT; + } + + /** + * Returns an updated instance of {@code this} value with {@link #expectedObjCount()} increased by + * {@value #DEFAULT_EXPECTED_OBJ_COUNT} as a convenience function to handle {@link + * MustRestartWithBiggerFilterException} thrown by {@link ReferencedObjectsResolver#resolve()} . + */ + default CleanupParams withIncreasedExpectedObjCount() { + return builder() + .from(this) + .expectedObjCount(expectedObjCount() + DEFAULT_EXPECTED_OBJ_COUNT) + .build(); + } + + /** + * Related to {@link #expectedObjCount()}, used to size the bloom filter identifying the + * referenced {@link Obj}s, defaults to {@value #DEFAULT_FALSE_POSITIVE_PROBABILITY}. + */ + @Value.Default + default double falsePositiveProbability() { + return DEFAULT_FALSE_POSITIVE_PROBABILITY; + } + + /** + * Maximum allowed FPP, checked when adding to the bloom filter identifying the referenced {@link + * Obj}s, defaults to {@value #DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY}. If this value is + * exceeded, a {@link MustRestartWithBiggerFilterException} will be thrown from {@link + * ReferencedObjectsResolver#resolve()}. + */ + @Value.Default + default double allowedFalsePositiveProbability() { + return DEFAULT_ALLOWED_FALSE_POSITIVE_PROBABILITY; + } + + /** Helper functionality to identify related {@link Obj}s, see {@link TransferRelatedObjects}. */ + @Value.Default + default TransferRelatedObjects relatedObjects() { + return createCompositeTransferRelatedObjects(); + } + + /** + * {@link ReferencedObjectsResolver} tries to not walk a commit more than once by memoizing the + * visited {@link CommitObj#id() commit IDs}, default is {@link + * #DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS}. Setting this to {@code true} disables this + * optimization. + */ + @Value.Default + default boolean allowDuplicateCommitTraversals() { + return DEFAULT_ALLOW_DUPLICATE_COMMIT_TRAVERSALS; + } + + /** + * Rate limit for commit objects per second during {@link ReferencedObjectsResolver#resolve()}, + * default is unlimited. Any positive value enables rate limiting, any value {@code <=0} disables + * rate limiting. + */ + @Value.Default + default int resolveCommitRatePerSecond() { + return 0; + } + + /** + * Rate limit for (non commit) objects per second during {@link + * ReferencedObjectsResolver#resolve()}, default is unlimited. Any positive value enables rate + * limiting, any value {@code <=0} disables rate limiting. + */ + @Value.Default + default int resolveObjRatePerSecond() { + return 0; + } + + /** + * Rate limit for scanning objects per second during {@link PurgeObjects#purge()}, default is + * unlimited. Any positive value enables rate limiting, any value {@code <=0} disables rate + * limiting. + */ + @Value.Default + default int purgeScanObjRatePerSecond() { + return 0; + } + + /** + * Rate limit for purging objects per second during {@link PurgeObjects#purge()}, default is + * unlimited. Any positive value enables rate limiting, any value {@code <=0} disables rate + * limiting. + */ + @Value.Default + default int purgeDeleteObjRatePerSecond() { + return 0; + } + + /** + * {@link ReferencedObjectsResolver} attempts to fetch objects from the backend database in + * batches, this parameter defines the batch size, defaults to {@link + * #DEFAULT_PENDING_OBJS_BATCH_SIZE}. + */ + @Value.Default + default int pendingObjsBatchSize() { + return DEFAULT_PENDING_OBJS_BATCH_SIZE; + } + + /** + * Size of the "recent object IDs" filter to prevent processing the same {@link ObjId}s. This * + * happens, when the values referenced from the commit index are iterated, because it iterates * + * over all keys, not only the keys added by a particular commit. + * + *

The value defaults to {@value #DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE}. It should be higher than + * the maximum number of keys in a commit. + */ + @Value.Default + default int recentObjIdsFilterSize() { + return DEFAULT_RECENT_OBJ_IDS_FILTER_SIZE; + } + + /** Rate limiter factory for the rate limits defined above, useful for testing purposes. */ + @Value.Default + default IntFunction rateLimitFactory() { + return RateLimit::create; + } + + /** Defines the names of the Nessie internal references, do not change. */ + @Value.Default + default List internalReferenceNames() { + return List.of(REF_REFS.name(), REF_REPO.name()); + } + + /** + * Optionally enable a dry-run mode, which does not delete any objects from the backend database, + * defaults to {@code false}. + */ + @Value.Default + default boolean dryRun() { + return false; + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/HeapSizes.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/HeapSizes.java new file mode 100644 index 0000000000..70b2c7c097 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/HeapSizes.java @@ -0,0 +1,162 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static java.lang.String.format; + +final class HeapSizes { + private HeapSizes() {} + + /* + org.agrona.collections.ObjectHashSet object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 float ObjectHashSet.loadFactor N/A + 16 4 int ObjectHashSet.resizeThreshold N/A + 20 4 int ObjectHashSet.size N/A + 24 1 boolean ObjectHashSet.shouldAvoidAllocation N/A + 25 7 (alignment/padding gap) + 32 8 java.lang.Object[] ObjectHashSet.values N/A + 40 8 org.agrona.collections.ObjectHashSet.ObjectIterator ObjectHashSet.iterator N/A + 48 8 java.util.function.IntConsumer ObjectHashSet.resizeNotifier N/A + 56 8 (object alignment gap) + Instance size: 64 bytes + Space losses: 7 bytes internal + 8 bytes external = 15 bytes total + */ + static final long HEAP_SIZE_OBJECT_HASH_SET = 64L; + /* + org.projectnessie.versioned.storage.common.persist.ObjId$ObjId256 object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 (alignment/padding gap) + 16 8 long ObjId256.l0 N/A + 24 8 long ObjId256.l1 N/A + 32 8 long ObjId256.l2 N/A + 40 8 long ObjId256.l3 N/A + Instance size: 48 bytes + Space losses: 4 bytes internal + 0 bytes external = 4 bytes total + */ + static final long HEAP_SIZE_OBJ_ID = 48L; + /* + long[] : 16 + 8*length + */ + static final long HEAP_SIZE_PRIMITIVE_OBJ_ARRAY = 16L; + + /* + com.google.common.hash.BloomFilter object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 8 (object header: class) N/A + 16 4 int BloomFilter.numHashFunctions N/A + 20 4 (alignment/padding gap) + 24 8 com.google.common.hash.BloomFilterStrategies.LockFreeBitArray BloomFilter.bits N/A + 32 8 com.google.common.hash.Funnel BloomFilter.funnel N/A + 40 8 com.google.common.hash.BloomFilter.Strategy BloomFilter.strategy N/A + Instance size: 48 bytes + Space losses: 4 bytes internal + 0 bytes external = 4 bytes total + */ + static final long HEAP_SIZE_BLOOM_FILTER = 48L; + /* + com.google.common.hash.BloomFilterStrategies$LockFreeBitArray object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 8 (object header: class) N/A + 16 8 java.util.concurrent.atomic.AtomicLongArray LockFreeBitArray.data N/A + 24 8 com.google.common.hash.LongAddable LockFreeBitArray.bitCount N/A + Instance size: 32 bytes + Space losses: 0 bytes internal + 0 bytes external = 0 bytes total + */ + static final long HEAP_SIZE_BIT_ARRAY = 32L; + /* + We assume that com.google.common.hash.LongAddables uses the pure-Java implementation, not Guava's + heap-expensive LongAdder implementation based on its Striped64 with 144 bytes per cell. + + java.util.concurrent.atomic.AtomicLong object internals (com.google.common.hash.LongAddables.PureJavaLongAddable): + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 (alignment/padding gap) + 16 8 long AtomicLong.value N/A + 24 8 (object alignment gap) + Instance size: 32 bytes + Space losses: 4 bytes internal + 8 bytes external = 12 bytes total + */ + static final long HEAP_SIZE_LONG_ADDER = 40L; + /* + java.util.concurrent.atomic.AtomicLongArray object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 (alignment/padding gap) + 16 8 long[] AtomicLongArray.array N/A + 24 8 (object alignment gap) + Instance size: 32 bytes + Space losses: 4 bytes internal + 8 bytes external = 12 bytes total + */ + static final long HEAP_SIZE_ATOMIC_LONG_ARRAY = 32L; + /* + long[] : 16 + 8*length + */ + static final long HEAP_SIZE_PRIMITIVE_LONG_ARRAY = 16L; + + /* + java.util.LinkedHashMap object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 4 (object header: class) N/A + 12 4 int HashMap.size N/A + 16 8 java.util.Set AbstractMap.keySet N/A + 24 8 java.util.Collection AbstractMap.values N/A + 32 4 int HashMap.modCount N/A + 36 4 int HashMap.threshold N/A + 40 4 float HashMap.loadFactor N/A + 44 4 int LinkedHashMap.putMode N/A + 48 8 java.util.HashMap.Node[] HashMap.table N/A + 56 8 java.util.Set HashMap.entrySet N/A + 64 1 boolean LinkedHashMap.accessOrder N/A + 65 7 (alignment/padding gap) + 72 8 java.util.LinkedHashMap.Entry LinkedHashMap.head N/A + 80 8 java.util.LinkedHashMap.Entry LinkedHashMap.tail N/A + 88 8 (object alignment gap) + Instance size: 96 bytes + Space losses: 7 bytes internal + 8 bytes external = 15 bytes total + */ + static final long HEAP_SIZE_LINKED_HASH_MAP = 96L; + /* + java.util.LinkedHashMap$Entry object internals: + OFF SZ TYPE DESCRIPTION VALUE + 0 8 (object header: mark) N/A + 8 8 (object header: class) N/A + 16 4 int Node.hash N/A + 20 4 (alignment/padding gap) + 24 8 java.lang.Object Node.key N/A + 32 8 java.lang.Object Node.value N/A + 40 8 java.util.HashMap.Node Node.next N/A + 48 8 java.util.LinkedHashMap.Entry Entry.before N/A + 56 8 java.util.LinkedHashMap.Entry Entry.after N/A + Instance size: 64 bytes + Space losses: 4 bytes internal + 0 bytes external = 4 bytes total + */ + static final long HEAP_SIZE_LINKED_HASH_MAP_ENTRY = 64L; + + static final long HEAP_SIZE_POINTER = 8L; + + static String memSizeToStringMB(long bytes) { + return format("%.1f M", ((double) bytes) / 1024L / 1024L); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java new file mode 100644 index 0000000000..7349b9983c --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterException.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +/** + * Thrown when the bloom filter's FPP is above the configured threshold when adding IDs. If this + * exception is encountered, the current garbage-collection run must be aborted and + * restarted with a bigger {@link CleanupParams#expectedObjCount()} value. + */ +public class MustRestartWithBiggerFilterException extends Exception { + public MustRestartWithBiggerFilterException(String msg, Throwable cause) { + super(msg, cause); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterRuntimeException.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterRuntimeException.java new file mode 100644 index 0000000000..a981b366b0 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/MustRestartWithBiggerFilterRuntimeException.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +/** + * Internally used unchecked exception to eventually be "wrapped" by a checked {@link + * MustRestartWithBiggerFilterException}. + */ +class MustRestartWithBiggerFilterRuntimeException extends RuntimeException { + public MustRestartWithBiggerFilterRuntimeException(String msg) { + super(msg); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java new file mode 100644 index 0000000000..ad5cb5409d --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeFilter.java @@ -0,0 +1,71 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import jakarta.validation.constraints.NotNull; +import java.util.List; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.persist.Obj; + +/** Filter to decide whether an {@link Obj} must be kept or whether it can be deleted. */ +public interface PurgeFilter { + boolean mustKeep(@NotNull Obj obj); + + @NessieImmutable + interface CompositePurgeFilter extends PurgeFilter { + List filters(); + + static CompositePurgeFilter compositePurgeFilter(PurgeFilter... filters) { + return ImmutableCompositePurgeFilter.of(List.of(filters)); + } + + static CompositePurgeFilter compositePurgeFilter(List filters) { + return ImmutableCompositePurgeFilter.of(filters); + } + + @Override + default boolean mustKeep(Obj obj) { + for (PurgeFilter filter : filters()) { + if (filter.mustKeep(obj)) { + return true; + } + } + return false; + } + } + + /** + * Recommended default purge filter, which considers a {@link ReferencedObjectsFilter} and a + * maximum value of {@link Obj#referenced()}. + */ + @NessieImmutable + interface ReferencedObjectsPurgeFilter extends PurgeFilter { + ReferencedObjectsFilter referencedObjects(); + + long maxObjReferenced(); + + static ReferencedObjectsPurgeFilter referencedObjectsPurgeFilter( + ReferencedObjectsFilter referencedObjects, long maxObjReferenced) { + return ImmutableReferencedObjectsPurgeFilter.of(referencedObjects, maxObjReferenced); + } + + @Override + default boolean mustKeep(Obj obj) { + return obj.referenced() > maxObjReferenced() + || referencedObjects().isProbablyReferenced(obj.id()); + } + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java new file mode 100644 index 0000000000..57f230258c --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjects.java @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +public interface PurgeObjects { + PurgeResult purge(); + + /** Return the current statistics, returns a result after {@link #purge()} threw an exception. */ + PurgeStats getStats(); + + /** + * Returns the estimated maximum heap pressure of this object tree. Considers the data + * structured that are required for the purge operation to work, a subset of the structures + * required for {@link ReferencedObjectsResolver#resolve()}. It is wrong to use the sum of {@link + * ReferencedObjectsResolver#estimatedHeapPressure()} and this value. + */ + long estimatedHeapPressure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java new file mode 100644 index 0000000000..1d986a6957 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsContext.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import jakarta.validation.constraints.NotNull; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.persist.Persist; + +/** + * Holds the data structures and parameters that are needed for the {@linkplain PurgeObjects purge + * operation}. + * + *

Once the {@linkplain ReferencedObjectsResolver referenced objects have been resolved}, the + * data structures that are not needed for the purge operation should become eligible for Java GC, + * which is why this context object exists and holds less information than {@link + * ReferencedObjectsContext}. + */ +@NessieImmutable +public interface PurgeObjectsContext { + @NotNull + Persist persist(); + + @NotNull + ReferencedObjectsFilter referencedObjects(); + + @NotNull + PurgeFilter purgeFilter(); + + int scanObjRatePerSecond(); + + int deleteObjRatePerSecond(); + + static PurgeObjectsContext purgeObjectsContext( + ReferencedObjectsContext referencedObjectsContext) { + return ImmutablePurgeObjectsContext.of( + referencedObjectsContext.persist(), + referencedObjectsContext.referencedObjects(), + referencedObjectsContext.purgeFilter(), + referencedObjectsContext.params().purgeScanObjRatePerSecond(), + referencedObjectsContext.params().purgeDeleteObjRatePerSecond(), + referencedObjectsContext.params().dryRun()); + } + + boolean dryRun(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java new file mode 100644 index 0000000000..f443da1c16 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeObjectsImpl.java @@ -0,0 +1,139 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static com.google.common.base.Preconditions.checkState; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.memSizeToStringMB; + +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntFunction; +import org.projectnessie.versioned.storage.common.persist.CloseableIterator; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class PurgeObjectsImpl implements PurgeObjects { + private static final Logger LOGGER = LoggerFactory.getLogger(PurgeObjectsImpl.class); + + private final PurgeObjectsContext purgeObjectsContext; + private final PurgeStatsBuilder stats; + private final AtomicBoolean used = new AtomicBoolean(); + private final RateLimit scanRateLimiter; + private final RateLimit purgeRateLimiter; + + public PurgeObjectsImpl( + PurgeObjectsContext purgeObjectsContext, IntFunction rateLimitIntFunction) { + this.purgeObjectsContext = purgeObjectsContext; + this.stats = new PurgeStatsBuilder(); + this.scanRateLimiter = rateLimitIntFunction.apply(purgeObjectsContext.scanObjRatePerSecond()); + this.purgeRateLimiter = + rateLimitIntFunction.apply(purgeObjectsContext.deleteObjRatePerSecond()); + } + + @Override + public PurgeResult purge() { + checkState(used.compareAndSet(false, true), "resolve() has already been called."); + + var purgeFilter = purgeObjectsContext.purgeFilter(); + var persist = purgeObjectsContext.persist(); + var clock = persist.config().clock(); + + LOGGER.info( + "Purging unreferenced objects in repository '{}', scanning {} objects per second, deleting {} objects per second, estimated context heap pressure: {}", + persist.config().repositoryId(), + scanRateLimiter, + purgeRateLimiter, + memSizeToStringMB(estimatedHeapPressure())); + + PurgeStats finalStats = null; + try { + stats.started = clock.instant(); + try (CloseableIterator iter = persist.scanAllObjects(Set.of())) { + while (iter.hasNext()) { + scanRateLimiter.acquire(); + stats.numScannedObjs++; + var obj = iter.next(); + if (purgeFilter.mustKeep(obj)) { + continue; + } + + purgeRateLimiter.acquire(); + purgeObj(obj); + } + } catch (RuntimeException e) { + stats.failure = e; + } finally { + stats.ended = clock.instant(); + finalStats = stats.build(); + } + + LOGGER.info( + "Successfully finished purging unreferenced objects after {} in repository '{}', purge stats: {}, estimated context heap pressure: {}", + finalStats.duration(), + persist.config().repositoryId(), + finalStats, + memSizeToStringMB(estimatedHeapPressure())); + } catch (RuntimeException e) { + if (finalStats != null) { + LOGGER.warn( + "Error while purging unreferenced objects after {} in repository '{}', purge stats: {}, estimated context heap pressure: {}", + finalStats.duration(), + persist.config().repositoryId(), + finalStats, + memSizeToStringMB(estimatedHeapPressure()), + e); + } else { + LOGGER.warn( + "Error while purging unreferenced objects in repository '{}'", + persist.config().repositoryId(), + stats.failure); + } + throw e; + } + + return ImmutablePurgeResult.of(stats.build()); + } + + @Override + public PurgeStats getStats() { + return stats.build(); + } + + @Override + public long estimatedHeapPressure() { + return purgeObjectsContext.referencedObjects().estimatedHeapPressure(); + } + + private void purgeObj(Obj obj) { + // TODO delete in parallel (multiple threads) + stats.numPurgedObjs++; + + var persist = purgeObjectsContext.persist(); + + var objType = obj.type(); + LOGGER.trace( + "Deleting obj {} of type {}/{} in repository '{}'", + obj.id(), + objType.name(), + objType.shortName(), + persist.config().repositoryId()); + + if (!purgeObjectsContext.dryRun()) { + persist.deleteWithReferenced(obj); + } + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java new file mode 100644 index 0000000000..2e94a6e237 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeResult.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface PurgeResult { + PurgeStats stats(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java new file mode 100644 index 0000000000..f597503e25 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStats.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface PurgeStats { + Instant started(); + + Instant ended(); + + default Duration duration() { + return Duration.between(started(), ended()); + } + + /** Number of objects handled while scanning the Nessie repository. */ + long numScannedObjs(); + + /** + * Number of purged (deleted) objects. For a {@linkplain CleanupParams#dryRun() dry-run}, this + * value indicates the number of objects that would have been deleted. + */ + long numPurgedObjs(); + + Optional failure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java new file mode 100644 index 0000000000..b318b257f7 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/PurgeStatsBuilder.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Instant; +import java.util.Optional; + +final class PurgeStatsBuilder { + Instant started; + Instant ended; + + Exception failure; + + long numScannedObjs; + long numPurgedObjs; + + PurgeStats build() { + return ImmutablePurgeStats.of( + started, ended, numScannedObjs, numPurgedObjs, Optional.ofNullable(failure)); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java new file mode 100644 index 0000000000..8df6112a16 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RateLimit.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import com.google.common.util.concurrent.RateLimiter; + +public interface RateLimit { + void acquire(); + + @SuppressWarnings("UnstableApiUsage") + static RateLimit create(int ratePerSecond) { + if (ratePerSecond <= 0) { + return new RateLimit() { + @Override + public void acquire() {} + + @Override + public String toString() { + return "unlimited"; + } + }; + } + return new RateLimit() { + final RateLimiter limiter = RateLimiter.create(ratePerSecond); + + @Override + public void acquire() { + limiter.acquire(); + } + + @Override + public String toString() { + return "up to " + ratePerSecond; + } + }; + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilter.java new file mode 100644 index 0000000000..171a9ac74d --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilter.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.versioned.storage.common.persist.ObjId; + +public interface RecentObjIdFilter { + boolean add(ObjId id); + + boolean contains(ObjId id); + + /** Returns the estimated maximum heap pressure of this object tree. */ + long estimatedHeapPressure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilterImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilterImpl.java new file mode 100644 index 0000000000..bac4ff59fe --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/RecentObjIdFilterImpl.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_LINKED_HASH_MAP; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_LINKED_HASH_MAP_ENTRY; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_OBJ_ID; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_POINTER; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_PRIMITIVE_LONG_ARRAY; + +import java.util.LinkedHashMap; +import java.util.Map; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +final class RecentObjIdFilterImpl implements RecentObjIdFilter { + private static final Object PRESENT = new Object(); + + private final LinkedHashMap recentObjIds; + private final long estimatedHeapPressure; + + public RecentObjIdFilterImpl(int recentObjIdsFilterSize) { + int capacity = (int) Math.ceil(recentObjIdsFilterSize / 0.75d); + this.estimatedHeapPressure = calculateEstimatedHeapPressure(recentObjIdsFilterSize, capacity); + + this.recentObjIds = + new LinkedHashMap<>(capacity) { + @Override + protected boolean removeEldestEntry(Map.Entry eldest) { + return size() >= recentObjIdsFilterSize; + } + }; + } + + @Override + public boolean contains(ObjId id) { + return recentObjIds.containsKey(id); + } + + @Override + public boolean add(ObjId id) { + return recentObjIds.put(id, PRESENT) == null; + } + + @Override + public long estimatedHeapPressure() { + return estimatedHeapPressure; + } + + private long calculateEstimatedHeapPressure(int size, int capacity) { + int tableSize = -1 >>> Integer.numberOfLeadingZeros(capacity - 1); + + return HEAP_SIZE_LINKED_HASH_MAP + + + // LHM entries + (HEAP_SIZE_LINKED_HASH_MAP_ENTRY + HEAP_SIZE_OBJ_ID) * size + // LHM table/node-array + + HEAP_SIZE_PRIMITIVE_LONG_ARRAY + + HEAP_SIZE_POINTER * tableSize; + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java new file mode 100644 index 0000000000..6c7dcb9cec --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsContext.java @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.cleanup.VisitedCommitFilter.ALLOW_DUPLICATE_TRAVERSALS; + +import jakarta.validation.constraints.NotNull; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.persist.Persist; + +/** + * Holds the data structures and parameters that are needed to {@linkplain ReferencedObjectsResolver + * resolving referenced objects}. + */ +@NessieImmutable +public interface ReferencedObjectsContext { + @NotNull + Persist persist(); + + @NotNull + ReferencedObjectsFilter referencedObjects(); + + @NotNull + CleanupParams params(); + + @NotNull + PurgeFilter purgeFilter(); + + @NotNull + VisitedCommitFilter visitedCommitFilter(); + + static ReferencedObjectsContext objectsResolverContext( + Persist persist, + CleanupParams params, + ReferencedObjectsFilter referencedObjects, + PurgeFilter purgeFilter) { + return ImmutableReferencedObjectsContext.of( + persist, + referencedObjects, + params, + purgeFilter, + params.allowDuplicateCommitTraversals() + ? ALLOW_DUPLICATE_TRAVERSALS + : new VisitedCommitFilterImpl()); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java new file mode 100644 index 0000000000..07e359c96a --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilter.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import jakarta.validation.constraints.NotNull; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +/** + * Mark {@linkplain ObjId object IDs} as referenced and allow checking whether object IDs are marked + * as referenced. + * + *

The implementation is usually backed by a probabilistic data structure (bloom filter), which + * means that there is a {@linkplain #expectedFpp() chance} that an unreferenced object is not + * collected, but all referenced objects are guaranteed to remain. + */ +public interface ReferencedObjectsFilter { + boolean markReferenced(@NotNull ObjId objId); + + boolean isProbablyReferenced(@NotNull ObjId objId); + + boolean withinExpectedFpp(); + + long approximateElementCount(); + + double expectedFpp(); + + long estimatedHeapPressure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java new file mode 100644 index 0000000000..506c78574b --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsFilterImpl.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_ATOMIC_LONG_ARRAY; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_BIT_ARRAY; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_BLOOM_FILTER; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_LONG_ADDER; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_PRIMITIVE_LONG_ARRAY; + +import com.google.common.hash.BloomFilter; +import com.google.common.hash.PrimitiveSink; +import java.util.concurrent.atomic.AtomicLong; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +@SuppressWarnings("UnstableApiUsage") +final class ReferencedObjectsFilterImpl implements ReferencedObjectsFilter { + + private final BloomFilter filter; + private final double allowedFalsePositiveProbability; + private final AtomicLong remainingElements; + private final long estimatedHeapPressure; + + ReferencedObjectsFilterImpl(CleanupParams params) { + this.filter = createBloomFilter(params); + this.remainingElements = new AtomicLong(params.expectedObjCount()); + this.allowedFalsePositiveProbability = params.allowedFalsePositiveProbability(); + this.estimatedHeapPressure = calculateEstimatedHeapPressure(params); + } + + static BloomFilter createBloomFilter(CleanupParams params) { + return BloomFilter.create( + ReferencedObjectsFilterImpl::funnel, + params.expectedObjCount(), + params.falsePositiveProbability()); + } + + private static void funnel(ObjId id, PrimitiveSink primitiveSink) { + var idSize = id.size(); + var i = 0; + for (; idSize >= 8; idSize -= 8) { + primitiveSink.putLong(id.longAt(i++)); + } + i <<= 3; + for (; idSize > 0; idSize--) { + primitiveSink.putByte(id.byteAt(i++)); + } + } + + @Override + public boolean markReferenced(ObjId objId) { + if (filter.put(objId)) { + if (remainingElements.decrementAndGet() >= 0L || withinExpectedFpp()) { + return true; + } + throw new MustRestartWithBiggerFilterRuntimeException( + "Bloom filter exceeded the configured expected FPP"); + } + return false; + } + + @Override + public boolean isProbablyReferenced(ObjId objId) { + return filter.mightContain(objId); + } + + @Override + public boolean withinExpectedFpp() { + return expectedFpp() <= allowedFalsePositiveProbability; + } + + @Override + public long approximateElementCount() { + return filter.approximateElementCount(); + } + + @Override + public double expectedFpp() { + return filter.expectedFpp(); + } + + @Override + public long estimatedHeapPressure() { + return estimatedHeapPressure; + } + + private static long calculateEstimatedHeapPressure(CleanupParams params) { + var bits = optimalNumOfBits(params.expectedObjCount(), params.falsePositiveProbability()); + var arrayLen = bits / 64 + 1; + return HEAP_SIZE_BLOOM_FILTER + + HEAP_SIZE_BIT_ARRAY + + HEAP_SIZE_LONG_ADDER + + HEAP_SIZE_ATOMIC_LONG_ARRAY + + HEAP_SIZE_PRIMITIVE_LONG_ARRAY * arrayLen; + } + + // See com.google.common.hash.BloomFilter.optimalNumOfBits + private static long optimalNumOfBits(long expectedInsertions, double fpp) { + if (fpp == 0) { + fpp = Double.MIN_VALUE; + } + return (long) (-expectedInsertions * Math.log(fpp) / (Math.log(2) * Math.log(2))); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java new file mode 100644 index 0000000000..ce3906933b --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolver.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.versioned.storage.common.persist.Persist; + +public interface ReferencedObjectsResolver { + /** + * Identifies all referenced objects in the {@linkplain Persist Nessie repository}. + * + * @return result containing the information for the follow-up {@linkplain PurgeObjects#purge() + * purge operation} and stats. + * @throws MustRestartWithBiggerFilterException thrown if this operation identifies more than the + * configured {@linkplain CleanupParams#expectedObjCount() expected object count}. This + * exception must be handled by calling code + */ + ResolveResult resolve() throws MustRestartWithBiggerFilterException; + + /** + * Return the current statistics, returns a valid result, even if {@link #resolve()} threw an + * exception. + */ + ResolveStats getStats(); + + /** + * Returns the estimated maximum heap pressure of this object tree. Considers the data + * structured that are required for the resolve operation to work, a superset of the structures + * required for {@link PurgeObjects#purge()}. It is wrong to use the sum of {@link + * PurgeObjects#estimatedHeapPressure()} and this value. + */ + long estimatedHeapPressure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java new file mode 100644 index 0000000000..e0f3f833d4 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ReferencedObjectsResolverImpl.java @@ -0,0 +1,369 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static com.google.common.base.Preconditions.checkState; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.memSizeToStringMB; +import static org.projectnessie.versioned.storage.cleanup.PurgeObjectsContext.purgeObjectsContext; +import static org.projectnessie.versioned.storage.common.logic.CommitLogQuery.commitLogQuery; +import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.indexesLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic; +import static org.projectnessie.versioned.storage.common.logic.ReferencesQuery.referencesQuery; +import static org.projectnessie.versioned.storage.common.objtypes.StandardObjType.VALUE; +import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID; + +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.IntFunction; +import org.agrona.collections.ObjectHashSet; +import org.projectnessie.model.Content; +import org.projectnessie.versioned.storage.common.indexes.StoreIndexElement; +import org.projectnessie.versioned.storage.common.logic.CommitLogic; +import org.projectnessie.versioned.storage.common.objtypes.CommitObj; +import org.projectnessie.versioned.storage.common.objtypes.CommitOp; +import org.projectnessie.versioned.storage.common.objtypes.ContentValueObj; +import org.projectnessie.versioned.storage.common.persist.Obj; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.common.persist.Reference; +import org.projectnessie.versioned.store.DefaultStoreWorker; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +final class ReferencedObjectsResolverImpl implements ReferencedObjectsResolver { + private static final Logger LOGGER = LoggerFactory.getLogger(ReferencedObjectsResolverImpl.class); + + private final ObjectHashSet pendingObjs = new ObjectHashSet<>(); + private final Deque pendingHeads = new ArrayDeque<>(); + + /** + * Set of recently handled 'ObjId's to prevent re-processing the same objects multiple times. This + * happens, when the values referenced from the commit index are iterated, because it iterates + * over all keys, not only the keys added by a particular commit. + */ + private final RecentObjIdFilter recentObjIds; + + private final ReferencedObjectsContext referencedObjectsContext; + + private final ResolveStatsBuilder stats; + private final RateLimit commitRateLimiter; + private final RateLimit objRateLimiter; + + private final AtomicBoolean used = new AtomicBoolean(); + + ReferencedObjectsResolverImpl( + ReferencedObjectsContext referencedObjectsContext, + IntFunction rateLimitIntFunction) { + this.referencedObjectsContext = referencedObjectsContext; + this.stats = new ResolveStatsBuilder(); + this.commitRateLimiter = + rateLimitIntFunction.apply(referencedObjectsContext.params().resolveCommitRatePerSecond()); + this.objRateLimiter = + rateLimitIntFunction.apply(referencedObjectsContext.params().resolveObjRatePerSecond()); + this.recentObjIds = + new RecentObjIdFilterImpl(referencedObjectsContext.params().recentObjIdsFilterSize()); + } + + @Override + public long estimatedHeapPressure() { + return referencedObjectsContext.referencedObjects().estimatedHeapPressure() + + referencedObjectsContext.visitedCommitFilter().estimatedHeapPressure() + + recentObjIds.estimatedHeapPressure(); + } + + @Override + public ResolveResult resolve() throws MustRestartWithBiggerFilterException { + checkState(used.compareAndSet(false, true), "resolve() has already been called."); + + LOGGER.info( + "Identifying referenced objects in repository '{}', processing {} commits per second, processing {} objects per second, estimated context heap pressure: {}", + referencedObjectsContext.persist().config().repositoryId(), + commitRateLimiter, + objRateLimiter, + memSizeToStringMB(estimatedHeapPressure())); + + var persist = referencedObjectsContext.persist(); + var params = referencedObjectsContext.params(); + + ResolveStats finalStats = null; + try { + finalStats = doResolve(persist, params); + + LOGGER.info( + "Successfully finished identifying referenced objects after {} in repository '{}', resolve stats: {}, estimated context heap pressure: {}", + finalStats.duration(), + persist.config().repositoryId(), + finalStats, + memSizeToStringMB(estimatedHeapPressure())); + } catch (MustRestartWithBiggerFilterRuntimeException mustRestart) { + LOGGER.warn( + "Must restart identifying referenced objects for repository '{}', current parameters: expected object count: {}, FPP: {}, allowed FPP: {}, resolve stats: {}, estimated context heap pressure: {}", + persist.config().repositoryId(), + params.expectedObjCount(), + params.falsePositiveProbability(), + params.allowedFalsePositiveProbability(), + finalStats, + memSizeToStringMB(estimatedHeapPressure())); + throw new MustRestartWithBiggerFilterException(mustRestart.getMessage(), mustRestart); + } catch (RuntimeException e) { + if (finalStats != null) { + LOGGER.warn( + "Error while identifying referenced objects after {} in repository '{}', stats: {}, estimated context heap pressure: {}", + finalStats.duration(), + persist.config().repositoryId(), + finalStats, + memSizeToStringMB(estimatedHeapPressure()), + e); + } else { + LOGGER.warn( + "Error while identifying referenced objects after {} in repository '{}'", + persist.config().repositoryId(), + e); + } + throw e; + } + + return ImmutableResolveResult.of(stats.build(), purgeObjectsContext(referencedObjectsContext)); + } + + private ResolveStats doResolve(Persist persist, CleanupParams params) { + var clock = persist.config().clock(); + + ResolveStats finalStats; + try { + stats.started = clock.instant(); + + checkState( + repositoryLogic(persist).repositoryExists(), + "The provided repository has not been initialized."); + + params.relatedObjects().repositoryRelatedObjects().forEach(this::pendingObj); + + var referenceLogic = referenceLogic(persist); + var commitLogic = commitLogic(persist); + + for (String internalReferenceName : params.internalReferenceNames()) { + var intRef = persist.fetchReference(internalReferenceName); + checkState(intRef != null, "Internal reference %s not found!", internalReferenceName); + handleReference(intRef); + processPendingHeads(commitLogic); + } + + for (var referencesIter = referenceLogic.queryReferences(referencesQuery()); + referencesIter.hasNext(); ) { + var reference = referencesIter.next(); + handleReference(reference); + processPendingHeads(commitLogic); + } + + processPendingHeads(commitLogic); + + while (!pendingObjs.isEmpty()) { + processPendingObjs(); + } + } catch (RuntimeException e) { + stats.mustRestart = e instanceof MustRestartWithBiggerFilterRuntimeException; + stats.failure = e; + throw e; + } finally { + stats.ended = clock.instant(); + finalStats = stats.build(); + } + return finalStats; + } + + private void processPendingHeads(CommitLogic commitLogic) { + while (!pendingHeads.isEmpty()) { + var head = pendingHeads.removeFirst(); + commitLogic.commitLog(commitLogQuery(head)).forEachRemaining(this::handleCommit); + } + } + + @Override + public ResolveStats getStats() { + return stats.build(); + } + + private void handleReference(Reference reference) { + stats.numReferences++; + + var persist = referencedObjectsContext.persist(); + + if (reference.deleted()) { + LOGGER.trace( + "Skipping deleted reference {} in repository '{}'", + reference.name(), + persist.config().repositoryId()); + return; + } + + LOGGER.debug( + "Walking reference {} in repository '{}' starting at commit {}", + reference.name(), + persist.config().repositoryId(), + reference.pointer()); + + referencedObjectsContext + .params() + .relatedObjects() + .referenceRelatedObjects(reference) + .forEach(this::pendingObj); + + commitChain(reference.pointer()); + + var extendedInfo = reference.extendedInfoObj(); + if (extendedInfo != null) { + referencedObjectsContext.referencedObjects().markReferenced(extendedInfo); + } + } + + private void commitChain(ObjId head) { + if (EMPTY_OBJ_ID.equals(head)) { + // Prevent visiting the same commit more often than once + return; + } + + stats.numCommitChainHeads++; + + if (referencedObjectsContext.visitedCommitFilter().alreadyVisited(head)) { + // Prevent visiting the same commit more often than once + return; + } + + pendingHeads.addLast(head); + } + + private void handleCommit(CommitObj commit) { + stats.numCommits++; + + if (!referencedObjectsContext.visitedCommitFilter().mustVisit(commit.id())) { + // Prevent visiting the same commit more often than once + return; + } + + commitRateLimiter.acquire(); + + var persist = referencedObjectsContext.persist(); + + LOGGER.debug( + "Handling commit {} in repository '{}'", commit.id(), persist.config().repositoryId()); + + stats.numUniqueCommits++; + + referencedObjectsContext.referencedObjects().markReferenced(commit.id()); + + referencedObjectsContext + .params() + .relatedObjects() + .commitRelatedObjects(commit) + .forEach(this::pendingObj); + + var indexesLogic = indexesLogic(referencedObjectsContext.persist()); + var index = indexesLogic.buildCompleteIndexOrEmpty(commit); + for (StoreIndexElement indexElement : index) { + var content = indexElement.content(); + if (content.action().exists()) { + var value = content.value(); + pendingObj(value); + } + } + + commit.secondaryParents().forEach(this::commitChain); + } + + private void pendingObj(ObjId objId) { + if (recentObjIds.contains(objId)) { + return; + } + + if (!pendingObjs.add(objId)) { + return; + } + + stats.numQueuedObjs++; + + if (pendingObjs.size() >= referencedObjectsContext.params().pendingObjsBatchSize()) { + processPendingObjs(); + } + } + + private void processPendingObjs() { + stats.numQueuedObjsBulkFetches++; + + var persist = referencedObjectsContext.persist(); + + LOGGER.debug( + "Fetching {} pending objects in repository '{}'", + pendingObjs.size(), + persist.config().repositoryId()); + + var objs = persist.fetchObjsIfExist(pendingObjs.toArray(ObjId[]::new)); + // Must clear 'pendingObjs' here, because handleObj can add more objects to it + pendingObjs.clear(); + + for (Obj obj : objs) { + if (obj != null) { + handleObj(obj); + } + } + } + + private void handleObj(Obj obj) { + objRateLimiter.acquire(); + + if (!recentObjIds.add(obj.id())) { + // already handled + return; + } + + stats.numObjs++; + + var persist = referencedObjectsContext.persist(); + + var objType = obj.type(); + + LOGGER.debug( + "Handling obj {} of type {}/{} in repository '{}'", + obj.id(), + objType.name(), + objType.shortName(), + persist.config().repositoryId()); + + referencedObjectsContext.referencedObjects().markReferenced(obj.id()); + + if (VALUE.equals(objType)) { + var contentValueObj = (ContentValueObj) obj; + var content = + DefaultStoreWorker.instance() + .valueFromStore(contentValueObj.payload(), contentValueObj.data()); + + handleContent(content); + } + } + + private void handleContent(Content content) { + stats.numContents++; + + referencedObjectsContext + .params() + .relatedObjects() + .contentRelatedObjects(content) + .forEach(this::pendingObj); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java new file mode 100644 index 0000000000..120c791c76 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveResult.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface ResolveResult { + ResolveStats stats(); + + /** Context required for a purge. */ + PurgeObjectsContext purgeObjectsContext(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java new file mode 100644 index 0000000000..dd61216849 --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStats.java @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import org.projectnessie.nessie.immutables.NessieImmutable; + +@NessieImmutable +public interface ResolveStats { + Instant started(); + + Instant ended(); + + default Duration duration() { + return Duration.between(started(), ended()); + } + + boolean mustRestart(); + + /** Number of processed references, including Nessie internal references. */ + long numReferences(); + + /** Number of commit chain "heads". */ + long numCommitChainHeads(); + + /** Number of processed commit objects, including Nessie internal commits. */ + long numCommits(); + + /** Number of processed unique commit objects, including Nessie internal commits. */ + long numUniqueCommits(); + + /** Number of non-commit objects. */ + long numObjs(); + + /** Number of {@link org.projectnessie.model.Content} objects. */ + long numContents(); + + /** Number of non-commit objects that had been queued for batched commit object handling. */ + long numQueuedObjs(); + + /** Number of bulk non-commit object fetches. */ + long numQueuedObjsBulkFetches(); + + Optional failure(); +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java new file mode 100644 index 0000000000..bc2fef3ada --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/ResolveStatsBuilder.java @@ -0,0 +1,52 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Instant; +import java.util.Optional; + +final class ResolveStatsBuilder { + Instant started; + Instant ended; + + boolean mustRestart; + Exception failure; + + long numReferences; + long numCommitChainHeads; + long numCommits; + long numUniqueCommits; + long numObjs; + long numContents; + long numQueuedObjs; + long numQueuedObjsBulkFetches; + + ResolveStats build() { + return ImmutableResolveStats.of( + started, + ended, + mustRestart, + numReferences, + numCommitChainHeads, + numCommits, + numUniqueCommits, + numObjs, + numContents, + numQueuedObjs, + numQueuedObjsBulkFetches, + Optional.ofNullable(failure)); + } +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java new file mode 100644 index 0000000000..5570d8fc1b --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilter.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import org.projectnessie.versioned.storage.common.objtypes.CommitObj; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +/** + * Filter to prevent processing the same {@linkplain CommitObj Nessie commit} more than once. + * + *

There are two implementations of this interface: {@linkplain #ALLOW_DUPLICATE_TRAVERSALS one} + * that does not prevent duplicate processing, and {@linkplain VisitedCommitFilterImpl the + * default one} that does. The parameter {@link CleanupParams#allowDuplicateCommitTraversals()} is + * used to decide which implementation is being used. + */ +public interface VisitedCommitFilter { + boolean mustVisit(ObjId commitObjId); + + boolean alreadyVisited(ObjId commitObjId); + + long estimatedHeapPressure(); + + VisitedCommitFilter ALLOW_DUPLICATE_TRAVERSALS = + new VisitedCommitFilter() { + @Override + public boolean mustVisit(ObjId commitObjId) { + return true; + } + + @Override + public boolean alreadyVisited(ObjId commitObjId) { + return false; + } + + @Override + public long estimatedHeapPressure() { + return 0; + } + }; +} diff --git a/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java new file mode 100644 index 0000000000..c5efb936aa --- /dev/null +++ b/versioned/storage/cleanup/src/main/java/org/projectnessie/versioned/storage/cleanup/VisitedCommitFilterImpl.java @@ -0,0 +1,45 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.agrona.collections.Hashing.DEFAULT_LOAD_FACTOR; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_OBJECT_HASH_SET; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_OBJ_ID; +import static org.projectnessie.versioned.storage.cleanup.HeapSizes.HEAP_SIZE_PRIMITIVE_OBJ_ARRAY; + +import org.agrona.collections.ObjectHashSet; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +final class VisitedCommitFilterImpl implements VisitedCommitFilter { + private final ObjectHashSet visited = new ObjectHashSet<>(64, DEFAULT_LOAD_FACTOR); + + @Override + public boolean mustVisit(ObjId commitObjId) { + return visited.add(commitObjId); + } + + @Override + public boolean alreadyVisited(ObjId commitObjId) { + return visited.contains(commitObjId); + } + + @Override + public long estimatedHeapPressure() { + var sz = visited.size(); + var cap = visited.capacity(); + return HEAP_SIZE_OBJECT_HASH_SET + HEAP_SIZE_PRIMITIVE_OBJ_ARRAY * cap + HEAP_SIZE_OBJ_ID * sz; + } +} diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestCleanup.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestCleanup.java new file mode 100644 index 0000000000..3821674cf7 --- /dev/null +++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestCleanup.java @@ -0,0 +1,516 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; +import static org.projectnessie.nessie.relocated.protobuf.ByteString.copyFromUtf8; +import static org.projectnessie.versioned.storage.cleanup.Cleanup.createCleanup; +import static org.projectnessie.versioned.storage.common.indexes.StoreKey.key; +import static org.projectnessie.versioned.storage.common.logic.CreateCommit.Add.commitAdd; +import static org.projectnessie.versioned.storage.common.logic.CreateCommit.newCommitBuilder; +import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic; +import static org.projectnessie.versioned.storage.common.objtypes.CommitHeaders.newCommitHeaders; +import static org.projectnessie.versioned.storage.common.objtypes.CommitType.NORMAL; +import static org.projectnessie.versioned.storage.common.objtypes.ContentValueObj.contentValue; +import static org.projectnessie.versioned.storage.common.objtypes.StringObj.stringData; +import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID; +import static org.projectnessie.versioned.testworker.OnRefOnly.onRef; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.projectnessie.nessie.immutables.NessieImmutable; +import org.projectnessie.versioned.storage.common.logic.CommitLogic; +import org.projectnessie.versioned.storage.common.logic.InternalRef; +import org.projectnessie.versioned.storage.common.objtypes.Compression; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.testextension.NessiePersist; +import org.projectnessie.versioned.storage.testextension.PersistExtension; +import org.projectnessie.versioned.store.DefaultStoreWorker; + +@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class}) +public class TestCleanup { + @InjectSoftAssertions protected SoftAssertions soft; + + @NessiePersist protected Persist persist; + + @Test + void mustRestartWithBiggerFilterThrown() { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var maxObjReferenced = persist.config().currentTimeMicros(); + + var cleanupParams = CleanupParams.builder().expectedObjCount(1).build(); + var cleanup = createCleanup(cleanupParams); + var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced); + var referencedObjectsResolver = + cleanup.createReferencedObjectsResolver(referencedObjectsContext); + + soft.assertThatThrownBy(referencedObjectsResolver::resolve) + .isInstanceOf(MustRestartWithBiggerFilterException.class); + + var newCleanupParams = cleanupParams.withIncreasedExpectedObjCount(); + + soft.assertThat(cleanupParams.expectedObjCount()) + .isLessThan(newCleanupParams.expectedObjCount()); + soft.assertThat( + CleanupParams.builder() + .from(cleanupParams) + .expectedObjCount(newCleanupParams.expectedObjCount()) + .build()) + .isEqualTo(newCleanupParams); + + cleanup = createCleanup(newCleanupParams); + referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced); + referencedObjectsResolver = cleanup.createReferencedObjectsResolver(referencedObjectsContext); + + soft.assertThatCode(referencedObjectsResolver::resolve).doesNotThrowAnyException(); + } + + @Test + void estimatedHeapPressure() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var maxObjReferenced = persist.config().currentTimeMicros(); + + var cleanup = createCleanup(CleanupParams.builder().build()); + var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced); + var referencedObjectsResolver = + cleanup.createReferencedObjectsResolver(referencedObjectsContext); + + soft.assertThat(referencedObjectsResolver.estimatedHeapPressure()).isGreaterThan(1L); + + var resolveResult = referencedObjectsResolver.resolve(); + var purge = cleanup.createPurgeObjects(resolveResult.purgeObjectsContext()); + + soft.assertThat(purge.estimatedHeapPressure()) + .isGreaterThan(1L) + .isLessThan(referencedObjectsResolver.estimatedHeapPressure()); + } + + @Test + void againstEmptyRepository() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros()); + soft.assertThat(resolveAndPurge.resolveResult().stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // refs + 3L, + // HEADs ("main" has EMPTY_OBJ_ID) + 2L, + // commits + 3L, + // unique commits + 3L, + // queued objs + 2L, + // objs + 2L); + soft.assertThat(resolveAndPurge.purgeResult().stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly(Optional.empty(), 5L, 0L); + } + + @Test + void purgeDeleteRefObjs() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var referenceLogic = referenceLogic(persist); + var commitLogic = commitLogic(persist); + + for (int i = 0; i < 10; i++) { + referenceLogic.createReference("kept-" + i, EMPTY_OBJ_ID, null); + } + for (int i = 0; i < 10; i++) { + referenceLogic.createReference("deleted-" + i, EMPTY_OBJ_ID, null); + } + + var resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros()); + soft.assertThat(resolveAndPurge.resolveResult().stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // 3 references (empty repo) + 20 created references + 3L + 20L, + // 2 queued commits (2 internal refs, "main" + all created refs hava EMPTY_OBJ_ID + 2L, + // 3 commits (empty repo) + 20 created references CommitObjs + 3L + 20L, + 3L + 20L, + // 2 objs (empty repo) + 20 created RefObj's + 2L + 20L, + 2L + 20L); + soft.assertThat(resolveAndPurge.purgeResult().stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly( + Optional.empty(), + // 5 (empty repo) + 20 CommitObj + 20 RefObj + 10 CommitObj + 5L + 20L + 20L, + // Nothing to delete + 0L); + + for (int i = 0; i < 10; i++) { + referenceLogic.deleteReference("deleted-" + i, EMPTY_OBJ_ID); + } + + resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros()); + soft.assertThat(resolveAndPurge.resolveResult().stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // 3 references (empty repo) + 20 created references + 3L + 10L, + // 2 queued commits (2 internal refs, "main" + all created refs hava EMPTY_OBJ_ID + 2L, + // 3 commits (empty repo) + 20 created references CommitObjs + 10 deleted references + // CommitObjs + 3L + 20L + 10L, + 3L + 20L + 10L, + // 2 objs (empty repo) + 20 created RefObj's + 2L + 20L, + 2L + 20L); + soft.assertThat(resolveAndPurge.purgeResult().stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly( + Optional.empty(), + // 5 (empty repo) + 20 CommitObj + 20 RefObj + 10 CommitObj + 5L + 20L + 20L + 10L, + // RefObj's are NOT deleted, because those are referenced via the `int/refs` commit log + // chain + 0L); + + // Shorten the "int/refs" history / make RefObj's eligible for cleanup + + var refRefs = requireNonNull(persist.fetchReference(InternalRef.REF_REFS.name())); + var newRefRefs = referenceLogic.rewriteCommitLog(refRefs, (num, commit) -> true); + soft.assertThat(newRefRefs.pointer()).isNotEqualTo(refRefs.pointer()); + var refRefsHead = requireNonNull(commitLogic.fetchCommit(newRefRefs.pointer())); + soft.assertThat(refRefsHead.directParent()).isEqualTo(EMPTY_OBJ_ID); + + resolveAndPurge = resolveAndPurge(persist.config().currentTimeMicros()); + soft.assertThat(resolveAndPurge.resolveResult().stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // 3 references (empty repo) + 20 created references + 3L + 10L, + // 2 queued commits (2 internal refs, "main" + all created refs hava EMPTY_OBJ_ID + 2L, + // 2 CommitObjs (one less than "empty repo": the commit to create the "main" reference + // has been "squashed") + 2L, + 2L, + // 2 objs (empty repo) + 10 "existing" RefObj's + 2L + 10L, + 2L + 10L); + soft.assertThat(resolveAndPurge.purgeResult().stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly( + Optional.empty(), + // 5 (empty repo) + 20 CommitObj + 20 RefObj + 10 CommitObj + 1 re-written CommitObj + 5L + 20L + 20L + 10L + 1L, + // RefObj's are deleted, because those are referenced via the `int/refs` commit log + // chain, CommitObj's from the create/delete reference operations: + // 10 RefObj's + 30 CommitObj + 2 CommitObj + 10L + 30L + 2L); + } + + @Test + void againstEmptyRepositoryWithGarbage() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var referenceLogic = referenceLogic(persist); + var commitLogic = commitLogic(persist); + + var unreferenced = new ArrayList(); + var keptUnreferenced = new ArrayList(); + var referencedCommits = new ArrayList(); + var referenced = new ArrayList(); + var contents = 0; + + for (int i = 0; i < 25; i++) { + var obj = + stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + unreferenced.add(obj.id()); + } + for (int i = 0; i < 25; i++) { + var cid = randomUUID(); + var obj = + contentValue( + cid.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("dummy " + i, cid.toString()))); + soft.assertThat(persist.storeObj(obj)).isTrue(); + unreferenced.add(obj.id()); + } + + // 10 new references + // 10 new RefObj + for (int i = 0; i < 10; i++) { + var head = EMPTY_OBJ_ID; + for (int i1 = 0; i1 < 20; i1++) { + var cid1 = randomUUID(); + var cid2 = randomUUID(); + var obj1 = + contentValue( + cid1.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 1", cid1.toString()))); + var obj2 = + contentValue( + cid2.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 2", cid2.toString()))); + var commit = + commitLogic.doCommit( + newCommitBuilder() + .commitType(NORMAL) + .parentCommitId(head) + .addAdds( + commitAdd( + key("store", "key", Integer.toString(i), Integer.toString(i1), "1"), + 42, + obj1.id(), + null, + cid1)) + .addAdds( + commitAdd( + key("store", "key", Integer.toString(i), Integer.toString(i1), "2"), + 42, + obj2.id(), + null, + cid2)) + .headers(newCommitHeaders().add("created", "foo-" + i + "-" + i1).build()) + .message("commit " + i1 + " on " + i) + .build(), + List.of(obj1, obj2)); + head = requireNonNull(commit).id(); + + referencedCommits.add(head); + referenced.add(obj1.id()); + referenced.add(obj2.id()); + contents += 2; + } + + var extendedInfo = + stringData("ref/foo", Compression.NONE, null, List.of(), copyFromUtf8("ext-info " + i)); + soft.assertThat(persist.storeObj(extendedInfo)).isTrue(); + referenced.add(extendedInfo.id()); + + referenceLogic.createReference("refs/heads/myref-" + i, head, extendedInfo.id()); + } + + var maxObjReferenced = persist.config().currentTimeMicros(); + + // Unreferenced, but newer than 'maxObjReferenced' + for (int i = 100; i < 125; i++) { + var obj = + stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + keptUnreferenced.add(obj.id()); + } + for (int i = 100; i < 125; i++) { + var obj = contentValue("cid-" + i, 42, copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + keptUnreferenced.add(obj.id()); + } + + var resolveAndPurge = resolveAndPurge(maxObjReferenced); + + soft.assertThat(resolveAndPurge.resolveResult().stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // refs + 3L + 10L, + // heads ("main" has EMPTY_OBJ_ID) + 2L + 10L, + // commits + 3L + 10L + referencedCommits.size(), + // unique commits + 3L + 10L + referencedCommits.size(), + // objects + non-existing UniqueObj + 2L + referenced.size() + contents, + 2L + referenced.size()); + + soft.assertThat(resolveAndPurge.purgeResult().stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly( + Optional.empty(), 5L + 100L + 20L + referencedCommits.size() + referenced.size(), 50L); + + soft.assertThat(persist.fetchObjsIfExist(unreferenced.toArray(new ObjId[0]))) + .containsOnlyNulls(); + soft.assertThat(persist.fetchObjsIfExist(keptUnreferenced.toArray(new ObjId[0]))) + .doesNotContainNull(); + soft.assertThat(persist.fetchObjsIfExist(referenced.toArray(new ObjId[0]))) + .doesNotContainNull(); + } + + @Test + void withSecondaryParents() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var referenceLogic = referenceLogic(persist); + var commitLogic = commitLogic(persist); + + var secondaryHead = buildNewCommitChain(commitLogic, "secondary"); + var referenceHead = buildNewCommitChain(commitLogic, "main"); + + var mergeCommit = + commitLogic.doCommit( + newCommitBuilder() + .commitType(NORMAL) + .parentCommitId(referenceHead) + .addSecondaryParents(secondaryHead) + .message("merge commit") + .headers(newCommitHeaders().add("created", "foo merge").build()) + .build(), + List.of()); + + referenceLogic.createReference("refs/heads/my-merge-1", requireNonNull(mergeCommit).id(), null); + referenceLogic.createReference("refs/heads/my-merge-2", requireNonNull(mergeCommit).id(), null); + + var maxObjReferenced = persist.config().currentTimeMicros(); + var resolveAndPurge = resolveAndPurge(maxObjReferenced); + + soft.assertThat(resolveAndPurge.resolveResult().stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // references + 3L + 1L + 1L, + // commit heads (all refs HEADs + secondary parent + incl duplicates & EMPTY_OBJ_ID) + 3L + 2L, + // commits (internals + 2x create-ref + 5+5 + 1) + 3L + 2L + 5L + 5L + 1L, + 3L + 2L + 5L + 5L + 1L, + // objects (internals, 2x RefObj + 5+5 contents + 10 non-existing UniqueObj + 2L + 2L + 5L + 5L + 10L, + 2L + 2L + 5L + 5L); + + soft.assertThat(resolveAndPurge.purgeResult().stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly(Optional.empty(), 5L + 13L + 12L, 0L); + } + + private ObjId buildNewCommitChain(CommitLogic commitLogic, String discrim) throws Exception { + var head = EMPTY_OBJ_ID; + for (int i = 0; i < 5; i++) { + var cid1 = randomUUID(); + var obj1 = + contentValue( + cid1.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("obj " + i + " " + discrim, cid1.toString()))); + var commit = + commitLogic.doCommit( + newCommitBuilder() + .commitType(NORMAL) + .parentCommitId(head) + .addAdds( + commitAdd( + key("store", "key", Integer.toString(i), discrim), + 42, + obj1.id(), + null, + cid1)) + .headers(newCommitHeaders().add("created", "foo-" + i + "-" + discrim).build()) + .message("commit " + i + " " + discrim) + .build(), + List.of(obj1)); + head = requireNonNull(commit).id(); + } + return head; + } + + ResolvePurgeResult resolveAndPurge(long maxObjReferenced) throws Exception { + var cleanup = createCleanup(CleanupParams.builder().build()); + var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced); + var referencedObjectsResolver = + cleanup.createReferencedObjectsResolver(referencedObjectsContext); + var resolveResult = referencedObjectsResolver.resolve(); + var purgeObjects = cleanup.createPurgeObjects(resolveResult.purgeObjectsContext()); + var purgeResult = purgeObjects.purge(); + + return ImmutableResolvePurgeResult.of(resolveResult, purgeResult); + } + + @NessieImmutable + interface ResolvePurgeResult { + ResolveResult resolveResult(); + + PurgeResult purgeResult(); + } +} diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestPurgeStatsBuilder.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestPurgeStatsBuilder.java new file mode 100644 index 0000000000..179cd07052 --- /dev/null +++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestPurgeStatsBuilder.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestPurgeStatsBuilder { + @InjectSoftAssertions SoftAssertions soft; + + @Test + void purgeStatsBuilder() { + var builder = new PurgeStatsBuilder(); + + var expected = ImmutablePurgeStats.builder(); + + builder.started = Instant.EPOCH; + expected.started(Instant.EPOCH); + builder.ended = Instant.EPOCH.plus(42, ChronoUnit.DAYS); + expected.ended(Instant.EPOCH.plus(42, ChronoUnit.DAYS)); + builder.numPurgedObjs = 1; + expected.numPurgedObjs(1); + builder.numScannedObjs = 2; + expected.numScannedObjs(2); + builder.failure = new Exception("hello"); + expected.failure(builder.failure); + + soft.assertThat(builder.build()).isEqualTo(expected.build()); + } +} diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestRateLimiting.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestRateLimiting.java new file mode 100644 index 0000000000..1d31a14426 --- /dev/null +++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestRateLimiting.java @@ -0,0 +1,240 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static java.util.Objects.requireNonNull; +import static java.util.UUID.randomUUID; +import static org.projectnessie.nessie.relocated.protobuf.ByteString.copyFromUtf8; +import static org.projectnessie.versioned.storage.cleanup.Cleanup.createCleanup; +import static org.projectnessie.versioned.storage.common.indexes.StoreKey.key; +import static org.projectnessie.versioned.storage.common.logic.CreateCommit.Add.commitAdd; +import static org.projectnessie.versioned.storage.common.logic.CreateCommit.newCommitBuilder; +import static org.projectnessie.versioned.storage.common.logic.Logics.commitLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.referenceLogic; +import static org.projectnessie.versioned.storage.common.logic.Logics.repositoryLogic; +import static org.projectnessie.versioned.storage.common.objtypes.CommitHeaders.newCommitHeaders; +import static org.projectnessie.versioned.storage.common.objtypes.CommitType.NORMAL; +import static org.projectnessie.versioned.storage.common.objtypes.ContentValueObj.contentValue; +import static org.projectnessie.versioned.storage.common.objtypes.StringObj.stringData; +import static org.projectnessie.versioned.storage.common.persist.ObjId.EMPTY_OBJ_ID; +import static org.projectnessie.versioned.testworker.OnRefOnly.onRef; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.projectnessie.versioned.storage.common.objtypes.Compression; +import org.projectnessie.versioned.storage.common.persist.ObjId; +import org.projectnessie.versioned.storage.common.persist.Persist; +import org.projectnessie.versioned.storage.testextension.NessiePersist; +import org.projectnessie.versioned.storage.testextension.PersistExtension; +import org.projectnessie.versioned.store.DefaultStoreWorker; + +@ExtendWith({PersistExtension.class, SoftAssertionsExtension.class}) +public class TestRateLimiting { + @InjectSoftAssertions protected SoftAssertions soft; + + @NessiePersist protected Persist persist; + + @Test + void productionImplementations() { + soft.assertThat(RateLimit.create(0)).extracting(RateLimit::toString).isEqualTo("unlimited"); + soft.assertThat(RateLimit.create(-42)).extracting(RateLimit::toString).isEqualTo("unlimited"); + soft.assertThat(RateLimit.create(42)).extracting(RateLimit::toString).isEqualTo("up to 42"); + + soft.assertThatCode(() -> RateLimit.create(0).acquire()).doesNotThrowAnyException(); + soft.assertThatCode(() -> RateLimit.create(42).acquire()).doesNotThrowAnyException(); + } + + @Test + void againstEmptyRepositoryWithGarbage() throws Exception { + soft.assertThat(repositoryLogic(persist).repositoryExists()).isTrue(); + + var referenceLogic = referenceLogic(persist); + var commitLogic = commitLogic(persist); + + var unreferenced = new ArrayList(); + var keptUnreferenced = new ArrayList(); + var referencedCommits = new ArrayList(); + var referenced = new ArrayList(); + var contents = 0; + + for (int i = 0; i < 25; i++) { + var obj = + stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + unreferenced.add(obj.id()); + } + for (int i = 0; i < 25; i++) { + var cid = randomUUID(); + var obj = + contentValue( + cid.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("dummy " + i, cid.toString()))); + soft.assertThat(persist.storeObj(obj)).isTrue(); + unreferenced.add(obj.id()); + } + + // 10 new references + // 10 new RefObj + for (int i = 0; i < 10; i++) { + var head = EMPTY_OBJ_ID; + for (int i1 = 0; i1 < 20; i1++) { + var cid1 = randomUUID(); + var cid2 = randomUUID(); + var obj1 = + contentValue( + cid1.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 1", cid1.toString()))); + var obj2 = + contentValue( + cid2.toString(), + 127, + DefaultStoreWorker.instance() + .toStoreOnReferenceState(onRef("obj " + i + " " + i1 + " 2", cid2.toString()))); + var commit = + commitLogic.doCommit( + newCommitBuilder() + .commitType(NORMAL) + .parentCommitId(head) + .addAdds( + commitAdd( + key("store", "key", Integer.toString(i), Integer.toString(i1), "1"), + 42, + obj1.id(), + null, + cid1)) + .addAdds( + commitAdd( + key("store", "key", Integer.toString(i), Integer.toString(i1), "2"), + 42, + obj2.id(), + null, + cid2)) + .headers(newCommitHeaders().add("created", "foo-" + i + "-" + i1).build()) + .message("commit " + i1 + " on " + i) + .build(), + List.of(obj1, obj2)); + head = requireNonNull(commit).id(); + + referencedCommits.add(head); + referenced.add(obj1.id()); + referenced.add(obj2.id()); + contents += 2; + } + + var extendedInfo = + stringData("ref/foo", Compression.NONE, null, List.of(), copyFromUtf8("ext-info " + i)); + soft.assertThat(persist.storeObj(extendedInfo)).isTrue(); + referenced.add(extendedInfo.id()); + + referenceLogic.createReference("refs/heads/myref-" + i, head, extendedInfo.id()); + } + + var maxObjReferenced = persist.config().currentTimeMicros(); + + // Unreferenced, but newer than 'maxObjReferenced' + for (int i = 100; i < 125; i++) { + var obj = + stringData("foo/bar", Compression.NONE, null, List.of(), copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + keptUnreferenced.add(obj.id()); + } + for (int i = 100; i < 125; i++) { + var obj = contentValue("cid-" + i, 42, copyFromUtf8("string " + i)); + soft.assertThat(persist.storeObj(obj)).isTrue(); + keptUnreferenced.add(obj.id()); + } + + var acquires = new AtomicLong[4]; + + var cleanup = + createCleanup( + CleanupParams.builder() + .resolveCommitRatePerSecond(1) + .resolveObjRatePerSecond(2) + .purgeScanObjRatePerSecond(3) + .purgeDeleteObjRatePerSecond(4) + .rateLimitFactory( + i -> { + var a = acquires[i - 1] = new AtomicLong(); + return a::incrementAndGet; + }) + .build()); + var referencedObjectsContext = cleanup.buildReferencedObjectsContext(persist, maxObjReferenced); + var referencedObjectsResolver = + cleanup.createReferencedObjectsResolver(referencedObjectsContext); + var resolveResult = referencedObjectsResolver.resolve(); + + soft.assertThat(acquires) + .extracting(l -> l != null ? l.get() : -1L) + .containsExactlyInAnyOrder( + 3L + 10L + referencedCommits.size(), 2L + referenced.size(), -1L, -1L); + soft.assertThat(resolveResult.stats()) + .extracting( + ResolveStats::failure, + ResolveStats::numReferences, + ResolveStats::numCommitChainHeads, + ResolveStats::numCommits, + ResolveStats::numUniqueCommits, + ResolveStats::numQueuedObjs, + ResolveStats::numObjs) + .containsExactly( + Optional.empty(), + // refs + 3L + 10L, + // heads ("main" has EMPTY_OBJ_ID) + 2L + 10L, + // commits + 3L + 10L + referencedCommits.size(), + // unique commits + 3L + 10L + referencedCommits.size(), + // objects + 2L + referenced.size() + contents, + 2L + referenced.size()); + + var purgeObjects = cleanup.createPurgeObjects(resolveResult.purgeObjectsContext()); + var purgeResult = purgeObjects.purge(); + + soft.assertThat(acquires) + .extracting(AtomicLong::get) + .containsExactlyInAnyOrder( + 3L + 10L + referencedCommits.size(), + 2L + referenced.size(), + 5L + 100L + 20L + referencedCommits.size() + referenced.size(), + 50L); + soft.assertThat(purgeResult.stats()) + .extracting(PurgeStats::failure, PurgeStats::numScannedObjs, PurgeStats::numPurgedObjs) + .containsExactly( + Optional.empty(), 5L + 100L + 20L + referencedCommits.size() + referenced.size(), 50L); + + soft.assertThat(persist.fetchObjsIfExist(unreferenced.toArray(new ObjId[0]))) + .containsOnlyNulls(); + soft.assertThat(persist.fetchObjsIfExist(keptUnreferenced.toArray(new ObjId[0]))) + .doesNotContainNull(); + soft.assertThat(persist.fetchObjsIfExist(referenced.toArray(new ObjId[0]))) + .doesNotContainNull(); + } +} diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java new file mode 100644 index 0000000000..1a8388ab82 --- /dev/null +++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestReferencedObjectsFilterImpl.java @@ -0,0 +1,110 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import static org.projectnessie.versioned.storage.common.persist.ObjId.objIdFromByteArray; +import static org.projectnessie.versioned.storage.common.persist.ObjId.randomObjId; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.projectnessie.versioned.storage.common.persist.ObjId; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestReferencedObjectsFilterImpl { + @InjectSoftAssertions SoftAssertions soft; + + @Test + public void emptyFilterContainsNothing() { + ReferencedObjectsFilterImpl filter = + new ReferencedObjectsFilterImpl(CleanupParams.builder().build()); + soft.assertThat(filter.isProbablyReferenced(ObjId.EMPTY_OBJ_ID)).isFalse(); + for (int i = 0; i < 100; i++) { + ObjId id = randomObjId(); + soft.assertThat(filter.isProbablyReferenced(id)).describedAs("id = %s", id).isFalse(); + } + } + + @Test + public void filterContainsAdded() { + ReferencedObjectsFilterImpl filter = + new ReferencedObjectsFilterImpl(CleanupParams.builder().build()); + + soft.assertThat(filter.estimatedHeapPressure()).isGreaterThan(1L); + + soft.assertThat(filter.markReferenced(ObjId.EMPTY_OBJ_ID)).isTrue(); + soft.assertThat(filter.markReferenced(ObjId.EMPTY_OBJ_ID)).isFalse(); + + Set ids = new HashSet<>(3000); + for (int i = 0; i < 1000; i++) { + ids.add(randomObjId()); + } + + for (int i = 0; i < 1000; i++) { + byte[] bytes = new byte[4 + ThreadLocalRandom.current().nextInt(33)]; + ThreadLocalRandom.current().nextBytes(bytes); + ids.add(objIdFromByteArray(bytes)); + } + + for (ObjId id : ids) { + // There is a theoretical chance that this assertion fails, but that change is extremely low. + // (We're adding 2000 object IDs to a bloom filter with an expected object count of 1M and a + // low FPP.) + soft.assertThat(filter.markReferenced(id)).isTrue(); + soft.assertThat(filter.markReferenced(id)).isFalse(); + } + + soft.assertThat(filter.isProbablyReferenced(ObjId.EMPTY_OBJ_ID)).isTrue(); + for (ObjId id : ids) { + soft.assertThat(filter.isProbablyReferenced(id)).describedAs("id = %s", id).isTrue(); + } + } + + @ParameterizedTest + @ValueSource(ints = {100, 1_000, 10_000}) + public void withinExpectedFpp(int expected) { + ReferencedObjectsFilterImpl filter = + new ReferencedObjectsFilterImpl(CleanupParams.builder().expectedObjCount(expected).build()); + + for (int i = 0; i < expected; i++) { + ObjId id = randomObjId(); + soft.assertThatCode(() -> filter.markReferenced(id)).doesNotThrowAnyException(); + soft.assertThat(filter.withinExpectedFpp()).isTrue(); + } + + // "withinExpectedFpp" should trigger at some point + boolean thrown = false; + for (int i = 0; i < expected / 2; i++) { + ObjId id = randomObjId(); + try { + filter.markReferenced(id); + soft.assertThat(filter.withinExpectedFpp()).isTrue(); + } catch (MustRestartWithBiggerFilterRuntimeException e) { + soft.assertThat(filter.withinExpectedFpp()).isFalse(); + thrown = true; + break; + } + } + soft.assertThat(thrown).isTrue(); + } +} diff --git a/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestResolveStatsBuilder.java b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestResolveStatsBuilder.java new file mode 100644 index 0000000000..3e50f0af62 --- /dev/null +++ b/versioned/storage/cleanup/src/test/java/org/projectnessie/versioned/storage/cleanup/TestResolveStatsBuilder.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2024 Dremio + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.projectnessie.versioned.storage.cleanup; + +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import org.assertj.core.api.SoftAssertions; +import org.assertj.core.api.junit.jupiter.InjectSoftAssertions; +import org.assertj.core.api.junit.jupiter.SoftAssertionsExtension; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SoftAssertionsExtension.class) +public class TestResolveStatsBuilder { + @InjectSoftAssertions SoftAssertions soft; + + @Test + void resolveStatsBuilder() { + var builder = new ResolveStatsBuilder(); + + var expected = ImmutableResolveStats.builder(); + + builder.mustRestart = true; + expected.mustRestart(true); + builder.started = Instant.EPOCH; + expected.started(Instant.EPOCH); + builder.ended = Instant.EPOCH.plus(42, ChronoUnit.DAYS); + expected.ended(Instant.EPOCH.plus(42, ChronoUnit.DAYS)); + builder.numCommits = 1; + expected.numCommits(1); + builder.numContents = 2; + expected.numContents(2); + builder.numObjs = 3; + expected.numObjs(3); + builder.numReferences = 4; + expected.numReferences(4); + builder.numUniqueCommits = 5; + expected.numUniqueCommits(5); + builder.numCommitChainHeads = 7; + expected.numCommitChainHeads(7); + builder.numQueuedObjs = 8; + expected.numQueuedObjs(8); + builder.numQueuedObjsBulkFetches = 9; + expected.numQueuedObjsBulkFetches(9); + builder.failure = new Exception("hello"); + expected.failure(builder.failure); + + soft.assertThat(builder.build()).isEqualTo(expected.build()); + } +}